Sto utilizzando Process e Queue di multiprocessing. Inizio parecchie funzioni in parallelo e la maggior parte si comporta bene: esse terminano, il loro output va alla loro coda e vengono visualizzate come .is_alive() == False. Ma per qualche ragione un paio di funzioni non si stanno comportando. Mostrano sempre .is_alive() == Vero, anche dopo l'ultima riga della funzione (un'istruzione di stampa che dice "Finito") è completa. Questo succede indipendentemente dal set di funzioni che lancio, anche se ce n'è solo uno. Se non vengono eseguiti in parallelo, le funzioni si comportano correttamente e ritornano normalmente. Che tipo di di cosa potrebbe essere il problema?multiprocessing di python: alcune funzioni non ritornano quando sono complete (materiale in coda troppo grande)
Ecco la funzione generica che sto utilizzando per gestire i lavori. Tutto ciò che non mostro sono le funzioni che sto passando ad esso. Sono lunghi, usano spesso matplotlib, a volte lanciano alcuni comandi della shell, ma non riesco a capire cosa hanno in comune quelli che hanno fallito.
def runFunctionsInParallel(listOf_FuncAndArgLists):
"""
Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.
"""
from multiprocessing import Process, Queue
def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
que.put(fff(*theArgs)) #we're putting return value into queue
print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
# We get this far even for "bad" functions
return
queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
for job in jobs: job.start() # Launch them all
import time
from math import sqrt
n=1
while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
n+=1
time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
print('---------------------------------------------------\n')
# I never get to the following line when one of the "bad" functions is running.
for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
# And now, collect all the outputs:
return([queue.get() for queue in queues])
Colpo completo al buio: i pendenti restituiscono un valore? (letteralmente, hanno 'return' in loro?) – Logan
Tutte le funzioni, buone e cattive, restituiscono una stringa singola (lunga). – CPBL
Tuttavia, se elimini l'uso di Code, il problema scompare. Quindi ... una coda è stata riempita. Posso guardarlo, e sembra a posto, ma in qualche modo il lavoro non sta finendo quando c'è una coda associata (e solo per le funzioni "cattive"). – CPBL