2013-05-08 18 views
15

Ho il seguente codice che scrive i md5sum ad un file di logPython: il sottoprocesso in parallelo

for file in files_output: 
    p=subprocess.Popen(['md5sum',file],stdout=logfile) 
p.wait() 
  1. questi saranno scritti in parallelo? Ad esempio, se md5sum impiega molto tempo per uno dei file, ne verrà avviato un altro prima di attendere il completamento di uno precedente?

  2. Se la risposta a quanto sopra è sì, posso supporre che l'ordine dei md5sum scritti nel file di log possa differire in base alla durata di md5sum per ogni file? (alcuni file possono essere enormi, alcuni piccoli)

risposta

14

Tutti i processi secondari vengono eseguiti in parallelo. (Per evitare questo, è necessario attendere esplicitamente per il loro completamento.) Possono persino scrivere nel file di registro contemporaneamente, rendendo così impossibile l'output. Per evitare ciò, è necessario consentire a ogni processo di scrivere in un file di registro diverso e raccogliere tutti gli output al termine di tutti i processi.

q = Queue.Queue() 
result = {} # used to store the results 
for fileName in fileNames: 
    q.put(fileName) 

def worker(): 
    while True: 
    fileName = q.get() 
    if fileName is None: # EOF? 
     return 
    subprocess_stuff_using(fileName) 
    wait_for_finishing_subprocess() 
    checksum = collect_md5_result_for(fileName) 
    result[fileName] = checksum # store it 

threads = [ threading.Thread(target=worker) for _i in range(20) ] 
for thread in threads: 
    thread.start() 
    q.put(None) # one EOF marker for each thread 

Dopo questo i risultati devono essere conservati in result.

+0

Grazie. Tuttavia, ho 1000 di md5sum. Preferirei non aprire un file separato per ciascuno. – imagineerThat

+2

No, non dovresti. Crea un file 'Queue.Queue' e un pool di thread di alcune dozzine di thread, lascia che ogni thread legga un elemento dalla coda e avvii un sottoprocesso per questo elemento, attendi il completamento di questo sottoprocesso, ottieni il risultato (il checksum md5) , memorizza il risultato in una mappatura. Se la coda è vuota, i thread dovrebbero terminare. – Alfe

+0

Nuovo in Python ancora. Devo usare Queue.Queue per scrivere su un mapping contemporaneamente? In caso contrario, cosa fa Queue.Queue per me? – imagineerThat

19
  1. Sì, questi processi md5sum verranno avviati in parallelo.
  2. Sì, l'ordine delle scritture md5sums sarà imprevedibile. E generalmente è considerata una cattiva pratica condividere in questo modo una singola risorsa come il file di molti processi.

Anche il vostro modo di fare p.wait() dopo il ciclo for attenderà solo per l'ultimo dei processi di md5sum per finire e il resto di loro potrebbe essere ancora in esecuzione.

Ma è possibile modificare leggermente questo codice per avere ancora vantaggi dell'elaborazione parallela e della prevedibilità dell'output sincronizzato se si raccoglie l'output md5sum in file temporanei e lo si raccoglie in un unico file una volta completati tutti i processi.

import subprocess 
import os 

processes = [] 
for file in files_output: 
    f = os.tmpfile() 
    p = subprocess.Popen(['md5sum',file],stdout=f) 
    processes.append((p, f)) 

for p, f in processes: 
    p.wait() 
    f.seek(0) 
    logfile.write(f.read()) 
    f.close() 
+0

Quindi immagino che l'ordine qui sia conservato perché i processi [] ne tengono traccia? vale a dire. process.append ((p, f)) viene eseguito prima della fine di md5sum, nell'ordine di files_output. – imagineerThat

+2

Sì, 'processes []' manterrà l'ordine originale di 'files_output []' e si assicurerà che ogni processo md5sum sia terminato. Ma se sei preoccupato per le risorse del sistema operativo, dovresti considerare il pool di thread con la coda delle attività e l'esecuzione sincrona di md5sum in ogni thread con 'subprocess.check_output()' come proposto da @Alfe. – dkz

5

Un modo semplice per raccogliere uscita dal sottoprocessi md5sum parallelo è quello di utilizzare un pool di thread e scrivere il file dal processo principale:

from multiprocessing.dummy import Pool # use threads 
from subprocess import check_output 

def md5sum(filename): 
    try: 
     return check_output(["md5sum", filename]), None 
    except Exception as e: 
     return None, e 

if __name__ == "__main__": 
    p = Pool(number_of_processes) # specify number of concurrent processes 
    with open("md5sums.txt", "wb") as logfile: 
     for output, error in p.imap(md5sum, filenames): # provide filenames 
      if error is None: 
       logfile.write(output) 
  • l'uscita dal md5sum è piccolo in modo da può memorizzarlo nella memoria
  • imap conserva l'ordine
  • number_of_processes potrebbe essere diverso dal numero di file o CPU core (valori maggiori non significa più veloce: dipende dalle prestazioni relative di IO (dischi) e CPU)

È possibile provare a passare più file contemporaneamente ai sottoprocessi md5sum.

In questo caso non è necessario il sottoprocesso esterno; you can calculate md5 in Python:

import hashlib 
from functools import partial 

def md5sum(filename, chunksize=2**15, bufsize=-1): 
    m = hashlib.md5() 
    with open(filename, 'rb', bufsize) as f: 
     for chunk in iter(partial(f.read, chunksize), b''): 
      m.update(chunk) 
    return m.hexdigest() 

utilizzare più processi invece di fili (per consentire il puro Python md5sum() correre in parallelo utilizzando più CPU) solo cadere .dummy dall'importazione nel codice precedente.

+0

Scusa, sto ancora imparando qui. Non capisco perché Queues non è usato qui. Se più processi scrivono su file di log, non ci saranno problemi? Se mi sbaglio, come mai c'è una sincronizzazione? – imagineerThat

+0

Sembra che 'Pool' supporti le chiamate asincrone. Questo significa che mantiene l'ordine di md5 scritto (nell'ordine di 'nome file ')? A differenza del semplice avvio del numero x di thread? – imagineerThat

+1

'Pool' fornisce un'interfaccia di livello superiore. Utilizza 'Queue's internamente. Il file 'logfile' è accessibile solo dal thread principale (solo la funzione' md5sum() 'viene eseguita nei thread figli). 'imap()' restituisce i risultati in ordine (come ho già menzionato esplicitamente) – jfs

Problemi correlati