11

Uso la libreria di multiprocessing python per un algoritmo in cui ho molti worker che elaborano determinati dati e restituiscono i risultati nel processo padre. Uso multiprocessing.Queue per passare i lavori ai lavoratori e in secondo luogo per raccogliere i risultati.Python multiprocessing e gestione delle eccezioni nei lavoratori

Funziona tutto bene, fino a quando il lavoratore non riesce a elaborare alcuni blocchi di dati. Nell'esempio semplificato sotto ciascun lavoratore ha due fasi:

  • inizializzazione - può sicuro, in questo caso lavoratore dovrebbe essere distrutto
  • elaborazione dati - elaborazione di un blocco di dati può fallire, in questo caso operaio deve saltare questo pezzo e continuare con i dati successivi.

Quando una di queste fasi non riesce, si verifica un deadlock dopo il completamento dello script. Questo codice simula il mio problema:

import multiprocessing as mp 
import random 

workers_count = 5 
# Probability of failure, change to simulate failures 
fail_init_p = 0.2 
fail_job_p = 0.3 


#========= Worker ========= 
def do_work(job_state, arg): 
    if random.random() < fail_job_p: 
     raise Exception("Job failed") 
    return "job %d processed %d" % (job_state, arg) 

def init(args): 
    if random.random() < fail_init_p: 
     raise Exception("Worker init failed") 
    return args 

def worker_function(args, jobs_queue, result_queue): 
    # INIT 
    # What to do when init() fails? 
    try: 
     state = init(args) 
    except: 
     print "!Worker %d init fail" % args 
     return 
    # DO WORK 
    # Process data in the jobs queue 
    for job in iter(jobs_queue.get, None): 
     try: 
      # Can throw an exception! 
      result = do_work(state, job) 
      result_queue.put(result) 
     except: 
      print "!Job %d failed, skip..." % job 
     finally: 
      jobs_queue.task_done() 
    # Telling that we are done with processing stop token 
    jobs_queue.task_done() 



#========= Parent ========= 
jobs = mp.JoinableQueue() 
results = mp.Queue() 
for i in range(workers_count): 
    mp.Process(target=worker_function, args=(i, jobs, results)).start() 

# Populate jobs queue 
results_to_expect = 0 
for j in range(30): 
    jobs.put(j) 
    results_to_expect += 1 

# Collecting the results 
# What if some workers failed to process the job and we have 
# less results than expected 
for r in range(results_to_expect): 
    result = results.get() 
    print result 

#Signal all workers to finish 
for i in range(workers_count): 
    jobs.put(None) 

#Wait for them to finish 
jobs.join() 

Ho due domanda su questo codice:

  1. Quando init() fallisce, come rilevare il lavoratore non è valido e non aspettare che finisca?
  2. In caso di errore do_work(), come notificare al processo padre che nella coda dei risultati dovrebbero essere previsti meno risultati?

Grazie per l'aiuto!

risposta

10

Ho modificato leggermente il codice per farlo funzionare (vedere la spiegazione di seguito).

import multiprocessing as mp 
import random 

workers_count = 5 
# Probability of failure, change to simulate failures 
fail_init_p = 0.5 
fail_job_p = 0.4 


#========= Worker ========= 
def do_work(job_state, arg): 
    if random.random() < fail_job_p: 
     raise Exception("Job failed") 
    return "job %d processed %d" % (job_state, arg) 

def init(args): 
    if random.random() < fail_init_p: 
     raise Exception("Worker init failed") 
    return args 

def worker_function(args, jobs_queue, result_queue): 
    # INIT 
    # What to do when init() fails? 
    try: 
     state = init(args) 
    except: 
     print "!Worker %d init fail" % args 
     result_queue.put('init failed') 
     return 
    # DO WORK 
    # Process data in the jobs queue 
    for job in iter(jobs_queue.get, None): 
     try: 
      # Can throw an exception! 
      result = do_work(state, job) 
      result_queue.put(result) 
     except: 
      print "!Job %d failed, skip..." % job 
      result_queue.put('job failed') 


#========= Parent ========= 
jobs = mp.Queue() 
results = mp.Queue() 
for i in range(workers_count): 
    mp.Process(target=worker_function, args=(i, jobs, results)).start() 

# Populate jobs queue 
results_to_expect = 0 
for j in range(30): 
    jobs.put(j) 
    results_to_expect += 1 

init_failures = 0 
job_failures = 0 
successes = 0 
while job_failures + successes < 30 and init_failures < workers_count: 
    result = results.get() 
    init_failures += int(result == 'init failed') 
    job_failures += int(result == 'job failed') 
    successes += int(result != 'init failed' and result != 'job failed') 
    #print init_failures, job_failures, successes 

for ii in range(workers_count): 
    jobs.put(None) 

mie modifiche:

  1. cambiato jobs essere solo una normale Queue (invece di JoinableQueue).
  2. I lavoratori ora comunicano le stringhe dei risultati speciali "init failed" e "job failed".
  3. I monitor di processo master per i suddetti risultati speciali fintantoché sono in vigore condizioni specifiche.
  4. Alla fine, mettere le richieste di "interruzione" (ovvero i lavori None) per tutti i lavoratori che possiedi, a prescindere. Si noti che non tutti questi possono essere estratti dalla coda (nel caso in cui il lavoratore non riuscisse a inizializzare).

A proposito, il codice originale era bello e facile da usare. Il bit delle probabilità casuali è piuttosto interessante.

+2

oppure potresti mettere una tupla '(risultato, errore)' (errore è Nessuno in caso di successo) nella coda dei risultati per evitare comunicazioni in banda per errori. – jfs