Uso il modulo multiprocessing per suddividere un'attività molto grande. Funziona per la maggior parte, ma mi manca qualcosa di ovvio con il mio design, perché in questo modo è molto difficile per me dire in modo efficace quando tutti i dati sono stati elaborati.Multiprocessing - design produttore/consumatore
Ho due compiti separati eseguiti; uno che nutre l'altro. Immagino che questo sia un problema produttore/consumatore. Uso una coda condivisa tra tutti i processi, in cui i produttori riempiono la coda, i consumatori leggono dalla coda e eseguono l'elaborazione. Il problema è che c'è una quantità limitata di dati, quindi a un certo punto tutti devono sapere che tutti i dati sono stati elaborati in modo che il sistema possa spegnersi con grazia.
Sembra che abbia senso utilizzare la funzione map_async(), ma poiché i produttori stanno riempiendo la coda, non conosco tutti gli elementi in primo piano, quindi devo entrare in un ciclo while e usa apply_async() e prova a rilevare quando tutto è fatto con una sorta di timeout ... brutto.
Mi sento come se mi mancasse qualcosa di ovvio. Come può essere meglio disegnato?
PRODCUER
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
CONSUMATORE
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an item
# if new records don't arrive in 10 seconds, process what you have
# and let the next process pick up more items.
record = queue.get(True, 10)
except Queue.Empty:
break
do_stuff_with_record(record)
PRINCIPALE
if __name__ == "__main__":
manager = multiprocessing.Manager()
consumer_queue = manager.Queue(1024*1024)
producer_queue = manager.Queue()
producer_items = xrange(0,10)
for item in producer_items:
producer_queue.put(item)
p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
p.start()
consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
Qui è dove ottiene formaggio. Non posso usare la mappa, perché l'elenco da consumare viene riempito allo stesso tempo. Quindi devo andare in un ciclo while e provare a rilevare un timeout. Il consumer_queue può diventare vuoto mentre i produttori stanno ancora tentando di riempirlo, quindi non posso solo rilevare una coda vuota e uscire da quello.
timed_out = False
timeout= 1800
while 1:
try:
result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,))
if timed_out:
timed_out = False
except Queue.Empty:
if timed_out:
break
timed_out = True
time.sleep(timeout)
time.sleep(1)
consumer_queue.join()
consumer_pool.close()
consumer_pool.join()
ho pensato che forse avrei potuto ottenere() i record nel thread principale e superare quelli nel consumatore invece di passare la coda, ma penso che io alla fine con lo stesso problema in questo modo. Devo ancora eseguire un ciclo while e usare apply_async() Grazie in anticipo per qualsiasi consiglio!
Penso che funzionerà. Grazie! Non sono sicuro di come il join() funzioni dalla descrizione, ma penso di aver trovato un modo. Ho passato l'evento nel processo start_producer_process() e lo ho impostato() dopo che tutti i produttori hanno finito di aggiungere al consumer_queue. A quel punto (di nuovo nel thread principale) se consumer_queue diventa vuoto, significa che tutto è stato elaborato in modo da poter uscire in sicurezza dal ciclo while. – user1914881
Ci scusiamo per la parte confusa, il join sarebbe nel thread principale in modo che non dovresti uscire dal programma solo dopo che i tuoi produttori sono stati completati e i consumatori hanno appena iniziato a fare il loro lavoro. – sean