2013-03-20 11 views
7

Sto usando Python 2.7.3. Ho parallelizzato del codice usando gli oggetti multiprocessing.Process sottoclassi. Se non ci sono errori nel codice nei miei oggetti Process sottoclassi, tutto funziona correttamente. Ma se ci sono errori nel codice nei miei oggetti Process sottoclassi, appariranno in modo anomalo in modo anomalo (nessun stacktrace stampato sulla shell madre) e l'utilizzo della CPU scenderà a zero. Il codice padre non si blocca mai, dando l'impressione che l'esecuzione sia semplicemente sospesa. Nel frattempo è davvero difficile rintracciare l'errore nel codice perché non viene fornita alcuna indicazione su dove si trova l'errore.Il processo di multiprocessing in Python si blocca in modo invisibile all'utente

Non riesco a trovare altre domande su stackoverflow che gestiscono lo stesso problema.

Immagino che gli oggetti Process sottoclassi sembrino bloccarsi in modo silenzioso perché non possono stampare un messaggio di errore nella shell del genitore, ma vorrei sapere cosa posso fare al riguardo in modo da poter eseguire il debugging in modo più efficiente (e così anche altri utenti del mio codice possono dirmi quando si imbattono anche in problemi).

EDIT: il mio codice attuale è troppo complesso, ma un esempio banale di un oggetto di processo sottoclasse con un errore in esso sarebbe qualcosa di simile a questo:

from multiprocessing import Process, Queue 

class Worker(Process): 

    def __init__(self, inputQueue, outputQueue): 

     super(Worker, self).__init__() 

     self.inputQueue = inputQueue 
     self.outputQueue = outputQueue 

    def run(self): 

     for i in iter(self.inputQueue.get, 'STOP'): 

      # (code that does stuff) 

      1/0 # Dumb error 

      # (more code that does stuff) 

      self.outputQueue.put(result) 
+2

Puoi pubblicare un banco di prova minima che illustra questo problema? – Blender

+0

@Blender Sì. Aggiunto del codice. – hendra

risposta

12

Quello che vuoi veramente è un modo per passare le eccezioni fino al processo genitore, giusto? Quindi puoi gestirli come preferisci.

Se si utilizza concurrent.futures.ProcessPoolExecutor, questo è automatico. Se usi multiprocessing.Pool, è banale. Se usi esplicitamente lo Process e il Queue, devi fare un po 'di lavoro, ma non è lo quello.

Ad esempio:

def run(self): 
    try: 
     for i in iter(self.inputQueue.get, 'STOP'): 
      # (code that does stuff) 
      1/0 # Dumb error 
      # (more code that does stuff) 
      self.outputQueue.put(result) 
    except Exception as e: 
     self.outputQueue.put(e) 

Poi, il codice chiamante può solo leggere Exception s fuori la coda come qualsiasi altra cosa. Invece di questo:

yield outq.pop() 

fare questo:

result = outq.pop() 
if isinstance(result, Exception): 
    raise result 
yield result 

(non so che cosa il vostro codice di genitore-processo di coda di lettura effettivo fa, perché il campione minimo solo ignora la coda, ma si spera. questo spiega l'idea, anche se il tuo codice reale in realtà non funziona in questo modo.)

Si presume che si desideri interrompere qualsiasi eccezione non gestita fino a run. Se si desidera passare l'eccezione e passare al successivo i in iter, è sufficiente spostare lo try nello for, anziché attorno ad esso.

Si presume inoltre che Exception s non siano valori validi.Se questo è un problema, la soluzione più semplice è quella di spingere solo (result, exception) tuple:

def run(self): 
    try: 
     for i in iter(self.inputQueue.get, 'STOP'): 
      # (code that does stuff) 
      1/0 # Dumb error 
      # (more code that does stuff) 
      self.outputQueue.put((result, None)) 
    except Exception as e: 
     self.outputQueue.put((None, e)) 

Poi, il codice popping fa questo:

result, exception = outq.pop() 
if exception: 
    raise exception 
yield result 

Si può notare che questo è simile al callback node.js stile, dove si passa (err, result) ad ogni richiamo. Sì, è fastidioso e hai intenzione di rovinare il codice in quello stile. Ma in realtà non lo stai usando, tranne nel wrapper; tutto il codice "application-level" che ottiene i valori fuori dalla coda o viene chiamato all'interno di run vede solo i normali rendimenti/rendimenti e le eccezioni sollevate.

Si può anche prendere in considerazione la possibilità di costruire un Future con le specifiche di concurrent.futures (o usare quella classe così com'è), anche se si sta facendo la coda di lavoro e l'esecuzione manuale. Non è così difficile, e ti dà una API molto bella, specialmente per il debug.

Infine, vale la pena notare che la maggior parte del codice costruito intorno lavoratori e le code può essere reso molto più semplice con un design esecutore/piscina, anche se sei assolutamente sicuro di voler un solo lavoratore per coda. Appena rottami tutto il boilerplate, e ruotare l'anello nel metodo Worker.run in una funzione (che solo return s o raise s come normale, invece di aggiungere ad una coda). Dal lato della chiamata, ancora una volta scartare tutto il boilerplate e solo submit o map la funzione del lavoro con i suoi parametri.

Tutto il tuo esempio può essere ridotto a:

def job(i): 
    # (code that does stuff) 
    1/0 # Dumb error 
    # (more code that does stuff) 
    return result 

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: 
    results = executor.map(job, range(10)) 

e sarà gestire automaticamente le eccezioni correttamente.


Come indicato nei commenti, il traceback per un'eccezione non riconduce al processo figlio; si spinge fino alla chiamata manuale raise result (o, se si sta usando un pool o un esecutore, il coraggio del pool o dell'esecutore).

La ragione è che multiprocessing.Queue è costruito in cima pickle, e le eccezioni di decapaggio non Pickle loro traceback. E la ragione di ciò è che non puoi mettere sottosopra i traceback. E la ragione è che i traceback sono pieni di riferimenti al contesto di esecuzione locale, quindi farli lavorare in un altro processo sarebbe molto difficile.

Quindi ... cosa puoi fare a riguardo? Non andare alla ricerca di una soluzione completamente generale. Invece, pensa a ciò di cui hai effettivamente bisogno. Il 90% del tempo, ciò che si vuole è "log eccezione, con traceback, e continua" o "stampare l'eccezione, con traceback, per stderr e exit(1) come il gestore non gestita-eccezioni di default". Per entrambi, non è necessario passare un'eccezione; basta formattarlo sul lato bambino e passare una stringa sopra. Se lo ha bisogno di qualcosa di più elaborato, risolvi esattamente ciò che ti serve e passa le informazioni sufficienti per metterlo insieme manualmente. Se non sai come formattare traceback ed eccezioni, consulta il modulo traceback. È piuttosto semplice E questo significa che non è necessario entrare nel meccanismo del sottaceto. (Non è molto difficile per un pickler o scrivere una classe titolare con un metodo __reduce__ o qualcosa del genere, ma se non è necessario, perché imparare tutto questo?)

+1

Grazie! È grandioso Ma c'è un modo per stampare l'intera traccia dello stack? Mi dice che ora c'è un errore, e che cos'è, ma non DOVE nella classe Worker si verifica l'errore. – hendra

+0

@npo: aggiungerò alla risposta per spiegarlo. – abarnert

+0

Come può essere applicato a 'apply_async', che utilizza solo una funzione che ha lo scopo di restituire alcuni risultati a un callback. Racchiudiamo solo le parti interne della funzione asincrona in una prova/eccetto e quindi restituiamo un oggetto di eccezione alla richiamata? – CMCDragonkai

1

Questa non è una risposta, solo un commento esteso. Si prega di eseguire il programma di dirci quello di uscita (se presente) si ottiene:

from multiprocessing import Process, Queue 

class Worker(Process): 

    def __init__(self, inputQueue, outputQueue): 

     super(Worker, self).__init__() 

     self.inputQueue = inputQueue 
     self.outputQueue = outputQueue 

    def run(self): 

     for i in iter(self.inputQueue.get, 'STOP'): 

      # (code that does stuff) 

      1/0 # Dumb error 

      # (more code that does stuff) 

      self.outputQueue.put(result) 

if __name__ == '__main__': 
    inq, outq = Queue(), Queue() 
    inq.put(1) 
    inq.put('STOP') 
    w = Worker(inq, outq) 
    w.start() 

ottengo:

% test.py 
Process Worker-1: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/home/unutbu/pybin/test.py", line 21, in run 
    1/0 # Dumb error 
ZeroDivisionError: integer division or modulo by zero 

Sono sorpresa (se) si ottiene nulla.

+0

Sarei sorpreso se non avesse ottenuto nulla su POSIX in una shell. Ma su Windows, o in IDLE o PyDev, o se il processo genitore è un'app della GUI ... Non sarei disposto a scommettere in entrambi i modi ... – abarnert

+0

@unutbu Non ho ottenuto nulla. Utilizzo di Windows a 64 bit e IDLE. – hendra

+0

@npo: Ok, e cosa succede se lo si esegue da una console? – unutbu

2

Suggerisco tale soluzione per mostrare le eccezioni di processo

from multiprocessing import Queue, Process, RawValue, Semaphore, Lock, Pool 
import traceback 
run_old = Process.run 

def run_new(*args, **kwargs): 
    try: 
     run_old(*args, **kwargs) 
    except (KeyboardInterrupt, SystemExit): 
     raise 
    except: 
     traceback.print_exc(file=sys.stdout) 

Process.run = run_new 
+0

semplice, la migliore risposta – CloudyGloudy

Problemi correlati