2011-10-29 20 views
12

Sto provando a scrivere una classe che calcoli i checksum utilizzando più processi, sfruttando quindi più core. Ho una classe abbastanza semplice per questo, e funziona benissimo quando si esegue un caso semplice. Ma ogni volta che creo due o più istanze della classe, il lavoratore non esce mai. Sembra che non abbia mai ricevuto il messaggio che la pipa è stata chiusa dal genitore.Utilizzo di pipe multiprocessing in python

Tutto il codice può essere trovato sotto. Per prima cosa calcolo i checksum md5 e sha1 separatamente, il che funziona, quindi provo a eseguire il calcolo in parallelo, quindi il programma si blocca quando è il momento di chiudere la pipe.

Cosa sta succedendo qui? Perché i tubi non funzionano come mi aspetto? Immagino che potrei fare una soluzione alternativa inviando un messaggio di "Stop" in coda e fare in modo che il bambino si dimetta in quel modo, ma mi piacerebbe davvero sapere perché questo non funziona così com'è.

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 
    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     self.parent_conn.close() # This is the child. Close unused end. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 


def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums in parallel causes a lockup! 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here! 

main() 

PS. Questo problema è stato risolto Ecco una versione funzionante del codice di cui sopra, se qualcuno è interessato:

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 

    all_open_parent_conns = [] 

    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     ChecksumPipe.all_open_parent_conns.append(self.parent_conn) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     for conn in ChecksumPipe.all_open_parent_conns: 
      conn.close() # This is the child. Close unused ends. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 

def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums also works fine now 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() 

main() 
+0

È possibile aggiungere "ChecksumPipe.all_open_parent_conns.remove (self.parent_conn)" dopo 'self.parent_conn.close()' per consentire la distruzione della connessione. –

+0

'self.summer = eval (" hashlib.% S() "% csname)' sembra brutto. Che dire di 'self.summer = getattr (hashlib, csname)()'? – glglgl

risposta

7

Sì, questo è un comportamento sorprendente davvero.

Tuttavia, se si guarda l'output di lsof per i due processi secondari paralleli, è facile notare che il secondo processo figlio ha più descrittori di file aperti.

Quello che succede è che quando due processi figli paralleli iniziare il secondo figlio eredita i tubi della controllante, in modo che quando il genitore chiama self.parent_conn.close() il secondo bambino ha ancora quel file pipe descrittore aperto, in modo che la descrizione del file pipe doesn si chiude nel kernel (il conteggio dei riferimenti è maggiore di 0), con l'effetto che self.child_conn.recv_bytes() nel primo processo figlio parallelo mai read() s EOF e EOFError non viene mai generato.

Potrebbe essere necessario inviare un messaggio di spegnimento esplicito, piuttosto che chiudere i descrittori di file perché sembra esserci un controllo limitato su quali descrittori di file vengono condivisi tra i processi (non esiste un contrassegno descrittore di file vicino al fork).

+0

Grazie! Questo ha chiarito le cose per me. L'ho risolto nel mio esempio utilizzando una variabile di classe condivisa contenente tutte le connessioni aperte in tutte le istanze, in modo che i bambini possano chiudere tutti i socket di cui non hanno bisogno. –