2013-03-09 13 views
29

Sto provando a eseguire qualche codice Python su diversi file in parallelo. Il costrutto è fondamentalmente:Il pool di multiprocessing Python si blocca al join?

def process_file(filename, foo, bar, baz=biz): 
    # do stuff that may fail and cause exception 

if __name__ == '__main__': 
    # setup code setting parameters foo, bar, and biz 

    psize = multiprocessing.cpu_count()*2 
    pool = multiprocessing.Pool(processes=psize) 

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:]) 
    pool.close() 
    pool.join() 

ho usato in precedenza pool.map di fare qualcosa di simile e ha funzionato grande, ma io non riesco a utilizzare che qui perché pool.map non (sembra) permettetemi di passare argomenti extra (e usare lambda per farlo non funzionerà perché lambda non può essere marshalling).

Così ora sto cercando di far funzionare le cose usando apply_async() direttamente. Il mio problema è che il codice sembra bloccarsi e non uscire mai. Alcuni file non riescono con un'eccezione, ma non vedo perché ciò che potrebbe causare un errore/hang del join? È interessante notare che se nessuno dei file fallisce con un'eccezione, esce in modo pulito.

Cosa mi manca?

Edit: Quando la funzione (e quindi un lavoratore) non riesce, vedo questa eccezione:

Exception in thread Thread-3: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results 
    task = get() 
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>,()) 

se vedo anche uno solo di questi, il processo padre processo si blocca per sempre, non raccogliendo i figli e in uscita .

+0

Il tuo codice sembra funzionare correttamente, anche se lancio eccezioni casuali in "file_processo". Quindi forse ha a che fare con ciò che stai facendo in 'process_file' che sta causando i problemi. – robertklep

+0

Huh. quale versione di Python? Sono a 2,7. process_file nel programma reale è piuttosto complesso, facendo un uso pesante di PIL, NetworkX, poly2tri e altre librerie. Conosco almeno 2 posizioni in cui ho riscontrato bug che possono causare eccezioni in alcuni casi, ma devo semplicemente ignorare quegli errori e andare avanti. Sono perplesso sul motivo per cui non uscirebbe mai per me, ma lavorare per voi. – clemej

+0

2.7.2, questo è quello che ho provato con: https://gist.github.com/robertklep/5125319 – robertklep

risposta

40

Mi spiace rispondere alla mia stessa domanda, ma ho trovato almeno una soluzione alternativa, nel caso in cui qualcun altro abbia un problema simile, desidero pubblicarlo qui. Accetterò qualsiasi risposta migliore là fuori.

Credo che la radice del problema sia http://bugs.python.org/issue9400. Questo mi dice due cose:

  • io non sono pazzo, quello che sto cercando di fare davvero dovrebbe funzionare
  • Almeno in python2, è molto difficile se non impossibile sottaceto 'eccezioni' torna al processo genitore. Quelli semplici funzionano, ma molti altri no.

Nel mio caso, la funzione del mio lavoratore stava lanciando un sottoprocesso che era segfaulting. Ciò ha restituito l'eccezione CalledProcessError, che non è selezionabile. Per qualche ragione, ciò rende l'oggetto del pool nel genitore uscire a pranzo e non tornare dalla chiamata a join().

Nel mio caso particolare, non mi interessa quale fosse l'eccezione. Al massimo voglio registrarlo e andare avanti. Per fare questo, semplicemente riprendo la mia funzione top worker in una clausola try/except. Se il lavoratore genera un'eccezione, viene catturato prima di tentare di tornare al processo principale, registrato e quindi il processo di lavoro si chiude normalmente poiché non tenta più di inviare l'eccezione. Vedi sotto:

def process_file_wrapped(filenamen, foo, bar, baz=biz): 
    try: 
     process_file(filename, foo, bar, baz=biz) 
    except: 
     print('%s: %s' % (filename, traceback.format_exc())) 

Poi, ho il mio process_file_wrapped mappa chiamata di funzione iniziale() al posto di quello originale. Ora il mio codice funziona come previsto.

+7

Non devi scusarti per aver risposto alla tua domanda. Questa pagina ora documenta un problema reale con una soluzione alternativa. Quello è buono. –

+1

A proposito, un'altra soluzione potrebbe essere quella di prendere solo il messaggio di errore dell'eccezione e generare utilizzando la classe "Exception" di base, che presumo sia selezionabile. –

+1

Sono ancora nuovo su StackExchange e non sono sicuro dell'etichetta. Dato che lo snippet di @robertklep dei commenti sopra funziona con una semplice Exception(), sospetto che vada bene anche io ... ma la linea di fondo è che devi prendere tutte le eccezioni e restituire uno conosciuto per lavorare. – clemej

4

È possibile utilizzare un'istanza functools.partial anziché lambda nei casi in cui l'oggetto deve essere sottoposto a decapaggio. Gli oggetti partial sono selezionabili da Python 2.7 (e in Python 3).

pool.map(functools.partial(process_file, x, foo, bar, baz=biz), sys.argv[1:]) 
+0

Hmm. Non ho usato functools prima. Grazie per le informazioni. Sospetto ancora che questo continui a soffrire dello stesso problema di propagazione delle eccezioni. – clemej

+0

Possibilmente; Non posso dirlo Hai detto che hai avuto un precedente successo con 'pool.map', quindi forse questo ti sarà d'aiuto. – nneonneo

+0

Ho usato pool.map in un contesto completamente diverso, dove le cose non avrebbero potuto causare eccezioni. Avrei dovuto essere più chiaro a riguardo nella domanda. – clemej

2

Per quello che vale, ho avuto un bug simile (non lo stesso) quando pool.map appeso. Il mio caso d'uso mi ha permesso di usare pool.terminate per risolverlo (assicurati che anche tu lo faccia prima di cambiare materiale).

ho usato pool.map prima di chiamare terminate quindi so tutto finito, dal docs:

Un equivalente parallelo della mappa() funzione built-in (che supporta solo un argomento iterabile però). Blocca finché il risultato non è pronto.

Se questo è il tuo caso d'uso, questo potrebbe essere un modo per applicarlo.

Problemi correlati