2012-07-18 23 views
6

Esiste un modo per inviare nuovamente un pezzo di dati per l'elaborazione, se il calcolo originale ha avuto esito negativo, utilizzando un pool semplice?ripetizioni del pool di multiprocessing in python

import random 
from multiprocessing import Pool 

def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
+1

Forse si vuole 'ritorno f (x) 'invece di generare un' ValueError'? Solo indovinando ... –

+0

Quanto è alta la probabilità di fallimento nella tua applicazione effettiva? Cioè, quanto è importante che il processo riprenda immediatamente anziché aspettare che gli altri processi finiscano per primi? – Isaac

+0

È una probabilità moderata di errore e non è necessario ripetere immediatamente l'operazione (ma, in caso contrario, è necessario riprovare in parallelo). – ash

risposta

9

Se è possibile (o non mente) riprovare immediatamente, utilizzare un decoratore avvolgendo la funzione:

import random 
from multiprocessing import Pool 
from functools import wraps 

def retry(f): 
    @wraps(f) 
    def wrapped(*args, **kwargs): 
     while True: 
      try: 
       return f(*args, **kwargs) 
      except ValueError: 
       pass 
    return wrapped 

@retry 
def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
5

È possibile utilizzare un Queue per alimentare di nuovo gli errori nel Pool attraverso un ciclo in avvio Process:

import multiprocessing as mp 
import random 

def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    total_items = len(pending) 
    successful = [] 
    failure_tracker = [] 

    q = mp.Queue() 
    p = mp.Pool(None, f_init, [q]) 
    results = p.imap(f, pending) 
    retry_results = [] 
    while len(successful) < total_items: 
     successful.extend([r for r in results if not r is None]) 
     successful.extend([r for r in retry_results if not r is None]) 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     if failed_items: 
      failure_tracker.append(failed_items) 
      retry_results = p.imap(f, failed_items); 
    p.close() 
    p.join() 

    print "Results: %s" % successful 
    print "Failures: %s" % failure_tracker 

if __name__ == '__main__': 
    main(range(1, 10)) 

L'output è simile a questo:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9] 
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []] 

A Pool gergo essere condivisi tra più processi. Quindi questo approccio basato su Queue. Se si tenta di passare un pool come parametro per i processi di piscine, si ottiene questo errore:

NotImplementedError: pool objects cannot be passed between processes or pickled 

Si potrebbe in alternativa provare un paio di tentativi immediati all'interno della vostra funzione di f, per evitare un sovraccarico di sincronizzazione. È davvero questione di quanto presto la tua funzione dovrebbe aspettare per riprovare, e su quanto sia probabile un successo se riprovati immediatamente.


Old Risposta:Per ragioni di completezza, ecco la mia risposta vecchio, che non è così ottimale come inviare nuovamente direttamente in piscina, ma potrebbe comunque essere rilevante a seconda del caso d'uso , perché fornisce un modo naturale per affrontare/limite n tentativi -Level:

è possibile utilizzare un Queue ai guasti aggregare e inoltrare nuovamente alla fine di ogni esecuzione, eseguendo numerosi test:

import multiprocessing as mp 
import random 


def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    run_number = 1 
    while pending: 
     jobs = pending 
     pending = [] 

     q = mp.Queue() 
     p = mp.Pool(None, f_init, [q]) 
     results = p.imap(f, jobs) 
     p.close() 

     p.join() 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     successful = [r for r in results if not r is None] 
     print "(%d) Succeeded: %s" % (run_number, successful) 
     print "(%d) Failed: %s" % (run_number, failed_items) 
     print 
     pending = failed_items 
     run_number += 1 

if __name__ == '__main__': 
    main(range(1, 10)) 

con uscita in questo modo:

(1) Succeeded: [9, 16, 36, 81] 
(1) Failed: [2, 1, 5, 7, 8] 

(2) Succeeded: [64] 
(2) Failed: [2, 1, 5, 7] 

(3) Succeeded: [1, 25] 
(3) Failed: [2, 7] 

(4) Succeeded: [49] 
(4) Failed: [2] 

(5) Succeeded: [4] 
(5) Failed: [] 
+0

Ho aggiornato la mia risposta a una che non richiede più esecuzioni e ora funziona sullo stesso pool originale. –

+0

Grazie per la risposta dettagliata. Mi piace l'idea di ripetere i calcoli falliti in una coda. Devo assegnare ad Andrew la taglia perché la sua soluzione fa un semplice tentativo. – ash

+0

@ash Ho fatto menzione di tentativi immediati nella mia risposta, pensando che sarebbe un'aggiunta banale/semplice e non quello che stavi cercando. Si noti inoltre che esso (tentativi immediati) non è ottimale per tutti i casi, specialmente quelli in cui un tentativo immediato ha una bassa probabilità di successo (nel qual caso è molto sub-ottimale in quanto causa la fame di risorse per lavori potenzialmente riusciti.) Congratulazioni a Andrew Comunque. –

Problemi correlati