2012-08-07 40 views
22

Sto utilizzando Process e Queue di multiprocessing. Inizio parecchie funzioni in parallelo e la maggior parte si comporta bene: esse terminano, il loro output va alla loro coda e vengono visualizzate come .is_alive() == False. Ma per qualche ragione un paio di funzioni non si stanno comportando. Mostrano sempre .is_alive() == Vero, anche dopo l'ultima riga della funzione (un'istruzione di stampa che dice "Finito") è completa. Questo succede indipendentemente dal set di funzioni che lancio, anche se ce n'è solo uno. Se non vengono eseguiti in parallelo, le funzioni si comportano correttamente e ritornano normalmente. Che tipo di di cosa potrebbe essere il problema?multiprocessing di python: alcune funzioni non ritornano quando sono complete (materiale in coda troppo grande)

Ecco la funzione generica che sto utilizzando per gestire i lavori. Tutto ciò che non mostro sono le funzioni che sto passando ad esso. Sono lunghi, usano spesso matplotlib, a volte lanciano alcuni comandi della shell, ma non riesco a capire cosa hanno in comune quelli che hanno fallito.

def runFunctionsInParallel(listOf_FuncAndArgLists): 
    """ 
    Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order. 
    """ 
    from multiprocessing import Process, Queue 

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue 
     print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name 
     que.put(fff(*theArgs)) #we're putting return value into queue 
     print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name 
     # We get this far even for "bad" functions 
     return 

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function 
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)] 
    for job in jobs: job.start() # Launch them all 
    import time 
    from math import sqrt 
    n=1 
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates 
     n+=1 
     time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs. 
     print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------') 
     print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)])) 
     print('---------------------------------------------------\n') 
    # I never get to the following line when one of the "bad" functions is running. 
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues? 
    # And now, collect all the outputs: 
    return([queue.get() for queue in queues]) 
+1

Colpo completo al buio: i pendenti restituiscono un valore? (letteralmente, hanno 'return' in loro?) – Logan

+0

Tutte le funzioni, buone e cattive, restituiscono una stringa singola (lunga). – CPBL

+0

Tuttavia, se elimini l'uso di Code, il problema scompare. Quindi ... una coda è stata riempita. Posso guardarlo, e sembra a posto, ma in qualche modo il lavoro non sta finendo quando c'è una coda associata (e solo per le funzioni "cattive"). – CPBL

risposta

14

Va bene, sembra che il tubo utilizzato per riempire la coda viene inserito quando l'uscita di una funzione è troppo grande (la mia comprensione greggio? Si tratta di un bug irrisolto/chiuso? http://bugs.python.org/issue8237). Ho modificato il codice nella mia domanda in modo che ci sia del buffering (le code vengono regolarmente svuotate mentre i processi sono in esecuzione), il che risolve tutti i miei problemi. Così ora questo prende una raccolta di compiti (funzioni e relativi argomenti), li lancia e raccoglie le uscite. Vorrei che fosse più semplice/più pulito.

Modifica (2014 Sep; aggiornamento 2017 Nov: riscritta per la leggibilità): Sto aggiornando il codice con i miglioramenti apportati da allora. Il nuovo codice (stessa funzione, ma funzioni migliori) è qui: https://github.com/cpbl/cpblUtilities/blob/master/parallel.py

La descrizione di chiamata è anche sotto.

def runFunctionsInParallel(*args, **kwargs): 
    """ This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments. 
    """ 
    return cRunFunctionsInParallel(*args, **kwargs).launch_jobs() 

########################################################################################### 
### 
class cRunFunctionsInParallel(): 
    ### 
    ####################################################################################### 
    """Run any list of functions, each with any arguments and keyword-arguments, in parallel. 
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied. 
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name. 
Parameters 
---------- 
listOf_FuncAndArgLists : a list of lists 
    List of up-to-three-element-lists, like [function, args, kwargs], 
    specifying the set of functions to be launched in parallel. If an 
    element is just a function, rather than a list, then it is assumed 
    to have no arguments or keyword arguments. Thus, possible formats 
    for elements of the outer list are: 
     function 
     [function, list] 
     [function, list, dict] 
kwargs: dict 
    One can also supply the kwargs once, for all jobs (or for those 
    without their own non-empty kwargs specified in the list) 
names: an optional list of names to identify the processes. 
    If omitted, the function name is used, so if all the functions are 
    the same (ie merely with different arguments), then they would be 
    named indistinguishably 
offsetsSeconds: int or list of ints 
    delay some functions' start times 
expectNonzeroExit: True/False 
    Normal behaviour is to not proceed if any function exits with a 
    failed exit code. This can be used to override this behaviour. 
parallel: True/False 
    Whenever the list of functions is longer than one, functions will 
    be run in parallel unless this parameter is passed as False 
maxAtOnce: int 
    If nonzero, this limits how many jobs will be allowed to run at 
    once. By default, this is set according to how many processors 
    the hardware has available. 
showFinished : int 
    Specifies the maximum number of successfully finished jobs to show 
    in the text interface (before the last report, which should always 
    show them all). 
Returns 
------- 
Returns a tuple of (return codes, return values), each a list in order of the jobs provided. 
Issues 
------- 
Only tested on POSIX OSes. 
Examples 
-------- 
See the testParallel() method in this module 
    """ 
+1

"Se questo non funziona, forse le cose che stai restituendo dalle tue funzioni non sono selezionabili, e quindi non riescono a passare correttamente attraverso le code." Un grande aiuto, ho avuto questo problema esatto e non sapevo che il "multiprocessing" si basa sul decapaggio per passare oggetti tra processi (compresi i risultati di ritorno). – Michael

+0

Solo un suggerimento, ma dovresti investire un po 'di tempo per renderlo leggibile.Ci sono probabilmente cose davvero utili qui, ma è quasi impossibile dirlo. –

+0

Sì, usiamo questa funzione una tonnellata, con grande effetto. Forse non so come renderlo leggibile, ma gli darò un'altra possibilità. Grazie. https://github.com/cpbl/cpblUtilities/issues/10 – CPBL

Problemi correlati