Quello che vuoi veramente è un modo per passare le eccezioni fino al processo genitore, giusto? Quindi puoi gestirli come preferisci.
Se si utilizza concurrent.futures.ProcessPoolExecutor
, questo è automatico. Se usi multiprocessing.Pool
, è banale. Se usi esplicitamente lo Process
e il Queue
, devi fare un po 'di lavoro, ma non è lo quello.
Ad esempio:
def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1/0 # Dumb error
# (more code that does stuff)
self.outputQueue.put(result)
except Exception as e:
self.outputQueue.put(e)
Poi, il codice chiamante può solo leggere Exception
s fuori la coda come qualsiasi altra cosa. Invece di questo:
yield outq.pop()
fare questo:
result = outq.pop()
if isinstance(result, Exception):
raise result
yield result
(non so che cosa il vostro codice di genitore-processo di coda di lettura effettivo fa, perché il campione minimo solo ignora la coda, ma si spera. questo spiega l'idea, anche se il tuo codice reale in realtà non funziona in questo modo.)
Si presume che si desideri interrompere qualsiasi eccezione non gestita fino a run
. Se si desidera passare l'eccezione e passare al successivo i in iter
, è sufficiente spostare lo try
nello for
, anziché attorno ad esso.
Si presume inoltre che Exception
s non siano valori validi.Se questo è un problema, la soluzione più semplice è quella di spingere solo (result, exception)
tuple:
def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1/0 # Dumb error
# (more code that does stuff)
self.outputQueue.put((result, None))
except Exception as e:
self.outputQueue.put((None, e))
Poi, il codice popping fa questo:
result, exception = outq.pop()
if exception:
raise exception
yield result
Si può notare che questo è simile al callback node.js stile, dove si passa (err, result)
ad ogni richiamo. Sì, è fastidioso e hai intenzione di rovinare il codice in quello stile. Ma in realtà non lo stai usando, tranne nel wrapper; tutto il codice "application-level" che ottiene i valori fuori dalla coda o viene chiamato all'interno di run
vede solo i normali rendimenti/rendimenti e le eccezioni sollevate.
Si può anche prendere in considerazione la possibilità di costruire un Future
con le specifiche di concurrent.futures
(o usare quella classe così com'è), anche se si sta facendo la coda di lavoro e l'esecuzione manuale. Non è così difficile, e ti dà una API molto bella, specialmente per il debug.
Infine, vale la pena notare che la maggior parte del codice costruito intorno lavoratori e le code può essere reso molto più semplice con un design esecutore/piscina, anche se sei assolutamente sicuro di voler un solo lavoratore per coda. Appena rottami tutto il boilerplate, e ruotare l'anello nel metodo Worker.run
in una funzione (che solo return
s o raise
s come normale, invece di aggiungere ad una coda). Dal lato della chiamata, ancora una volta scartare tutto il boilerplate e solo submit
o map
la funzione del lavoro con i suoi parametri.
Tutto il tuo esempio può essere ridotto a:
def job(i):
# (code that does stuff)
1/0 # Dumb error
# (more code that does stuff)
return result
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
results = executor.map(job, range(10))
e sarà gestire automaticamente le eccezioni correttamente.
Come indicato nei commenti, il traceback per un'eccezione non riconduce al processo figlio; si spinge fino alla chiamata manuale raise result
(o, se si sta usando un pool o un esecutore, il coraggio del pool o dell'esecutore).
La ragione è che multiprocessing.Queue
è costruito in cima pickle
, e le eccezioni di decapaggio non Pickle loro traceback. E la ragione di ciò è che non puoi mettere sottosopra i traceback. E la ragione è che i traceback sono pieni di riferimenti al contesto di esecuzione locale, quindi farli lavorare in un altro processo sarebbe molto difficile.
Quindi ... cosa puoi fare a riguardo? Non andare alla ricerca di una soluzione completamente generale. Invece, pensa a ciò di cui hai effettivamente bisogno. Il 90% del tempo, ciò che si vuole è "log eccezione, con traceback, e continua" o "stampare l'eccezione, con traceback, per stderr
e exit(1)
come il gestore non gestita-eccezioni di default". Per entrambi, non è necessario passare un'eccezione; basta formattarlo sul lato bambino e passare una stringa sopra. Se lo ha bisogno di qualcosa di più elaborato, risolvi esattamente ciò che ti serve e passa le informazioni sufficienti per metterlo insieme manualmente. Se non sai come formattare traceback ed eccezioni, consulta il modulo traceback
. È piuttosto semplice E questo significa che non è necessario entrare nel meccanismo del sottaceto. (Non è molto difficile per un pickler o scrivere una classe titolare con un metodo __reduce__
o qualcosa del genere, ma se non è necessario, perché imparare tutto questo?)
Puoi pubblicare un banco di prova minima che illustra questo problema? – Blender
@Blender Sì. Aggiunto del codice. – hendra