2012-12-20 22 views
6

Uso il modulo multiprocessing per suddividere un'attività molto grande. Funziona per la maggior parte, ma mi manca qualcosa di ovvio con il mio design, perché in questo modo è molto difficile per me dire in modo efficace quando tutti i dati sono stati elaborati.Multiprocessing - design produttore/consumatore

Ho due compiti separati eseguiti; uno che nutre l'altro. Immagino che questo sia un problema produttore/consumatore. Uso una coda condivisa tra tutti i processi, in cui i produttori riempiono la coda, i consumatori leggono dalla coda e eseguono l'elaborazione. Il problema è che c'è una quantità limitata di dati, quindi a un certo punto tutti devono sapere che tutti i dati sono stati elaborati in modo che il sistema possa spegnersi con grazia.

Sembra che abbia senso utilizzare la funzione map_async(), ma poiché i produttori stanno riempiendo la coda, non conosco tutti gli elementi in primo piano, quindi devo entrare in un ciclo while e usa apply_async() e prova a rilevare quando tutto è fatto con una sorta di timeout ... brutto.

Mi sento come se mi mancasse qualcosa di ovvio. Come può essere meglio disegnato?

PRODCUER

class ProducerProcess(multiprocessing.Process): 
    def __init__(self, item, consumer_queue): 
     self.item = item 
     self.consumer_queue = consumer_queue 
     multiprocessing.Process.__init__(self) 

    def run(self): 
     for record in get_records_for_item(self.item): # this takes time 
      self.consumer_queue.put(record) 

def start_producer_processes(producer_queue, consumer_queue, max_running): 
    running = [] 

    while not producer_queue.empty(): 
     running = [r for r in running if r.is_alive()] 
     if len(running) < max_running: 
      producer_item = producer_queue.get() 
      p = ProducerProcess(producer_item, consumer_queue) 
      p.start() 
      running.append(p) 
     time.sleep(1) 

CONSUMATORE

def process_consumer_chunk(queue, chunksize=10000): 
    for i in xrange(0, chunksize): 
     try: 
      # don't wait too long for an item 
      # if new records don't arrive in 10 seconds, process what you have 
      # and let the next process pick up more items. 

      record = queue.get(True, 10) 
     except Queue.Empty:     
      break 

     do_stuff_with_record(record) 

PRINCIPALE

if __name__ == "__main__": 
    manager = multiprocessing.Manager() 
    consumer_queue = manager.Queue(1024*1024) 
    producer_queue = manager.Queue() 

    producer_items = xrange(0,10) 

    for item in producer_items: 
     producer_queue.put(item) 

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8)) 
    p.start() 

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1) 

Qui è dove ottiene formaggio. Non posso usare la mappa, perché l'elenco da consumare viene riempito allo stesso tempo. Quindi devo andare in un ciclo while e provare a rilevare un timeout. Il consumer_queue può diventare vuoto mentre i produttori stanno ancora tentando di riempirlo, quindi non posso solo rilevare una coda vuota e uscire da quello.

timed_out = False 
    timeout= 1800 
    while 1: 
     try: 
      result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,)) 
      if timed_out: 
       timed_out = False 

     except Queue.Empty: 
      if timed_out: 
       break 

      timed_out = True 
      time.sleep(timeout) 
     time.sleep(1) 

    consumer_queue.join() 
    consumer_pool.close() 
    consumer_pool.join() 

ho pensato che forse avrei potuto ottenere() i record nel thread principale e superare quelli nel consumatore invece di passare la coda, ma penso che io alla fine con lo stesso problema in questo modo. Devo ancora eseguire un ciclo while e usare apply_async() Grazie in anticipo per qualsiasi consiglio!

risposta

2

È possibile utilizzare un manager.Event per segnalare la fine del lavoro. Questo evento può essere condiviso tra tutti i tuoi processi e quindi quando lo segnali dal tuo processo principale, gli altri lavoratori possono quindi chiudersi con grazia.

Quindi, i consumatori attendono che l'evento venga impostato e gestiscono la pulizia una volta impostato.

Per determinare quando impostare questo flag è possibile eseguire uno join sui thread di produzione e quando questi sono tutti completi è possibile quindi partecipare ai thread di consumo.

+0

Penso che funzionerà. Grazie! Non sono sicuro di come il join() funzioni dalla descrizione, ma penso di aver trovato un modo. Ho passato l'evento nel processo start_producer_process() e lo ho impostato() dopo che tutti i produttori hanno finito di aggiungere al consumer_queue. A quel punto (di nuovo nel thread principale) se consumer_queue diventa vuoto, significa che tutto è stato elaborato in modo da poter uscire in sicurezza dal ciclo while. – user1914881

+0

Ci scusiamo per la parte confusa, il join sarebbe nel thread principale in modo che non dovresti uscire dal programma solo dopo che i tuoi produttori sono stati completati e i consumatori hanno appena iniziato a fare il loro lavoro. – sean

0

Desidero fortemente raccomandare SimPy invece di multiprocesso/threading per eseguire la simulazione di evento discreta.