2010-07-09 14 views
36

Voglio un processo a esecuzione prolungata per restituire il suo avanzamento su una coda (o qualcosa di simile) che alimenterò a una finestra di dialogo della barra di avanzamento. Ho anche bisogno del risultato quando il processo è completato. Un esempio di test qui non riesce con un RuntimeError: Queue objects should only be shared between processes through inheritance.Come si passa un riferimento alla coda a una funzione gestita da pool.map_async()?

import multiprocessing, time 

def task(args): 
    count = args[0] 
    queue = args[1] 
    for i in xrange(count): 
     queue.put("%d mississippi" % i) 
    return "Done" 

def main(): 
    q = multiprocessing.Queue() 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, [(x, q) for x in range(10)]) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

Sono stato in grado di ottenere questo al lavoro utilizzando oggetti singoli Process (dove io sto alowed a passare un riferimento coda), ma poi non ho un pool per gestire i molti processi che voglio lanciare. Qualche consiglio su un modello migliore per questo?

+0

Non è una risposta alla tua domanda, ma prova la libreria 'execnet' per i mapping multi-processo. Il 'multiprocessing' incorporato ha alcuni problemi ancora da risolvere (vedi il tracker Python). Oltre a ciò il suo codice sorgente è abbastanza grande e complicato. La libreria 'execnet' mi sembra molto meglio di' multiprocessing'. –

risposta

43

Il seguente codice sembra funzionare:

import multiprocessing, time 

def task(args): 
    count = args[0] 
    queue = args[1] 
    for i in xrange(count): 
     queue.put("%d mississippi" % i) 
    return "Done" 


def main(): 
    manager = multiprocessing.Manager() 
    q = manager.Queue() 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, [(x, q) for x in range(10)]) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

Si noti che la coda è ottenuto da una manager.Queue() piuttosto che multiprocessing.Queue(). Grazie Alex per avermi indicato in questa direzione.

+0

+1 e solo una breve nota che la tua domanda mi ha aiutato in un problema che ho avuto oggi. Avevo trovato la versione Manager della coda, ma il mio codice non funzionava perché mi affidavo a un globale. Deve essere passato come parametro, come stai facendo. – winwaed

+0

Anche +1 per 'manager.Queue' - molto utile. – fantabolous

8

Fare qglobali opere ...:

import multiprocessing, time 

q = multiprocessing.Queue() 

def task(count): 
    for i in xrange(count): 
     q.put("%d mississippi" % i) 
    return "Done" 

def main(): 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, range(10)) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

Se avete bisogno di più code, per esempio per evitare di confondere i progressi dei vari processi del pool, dovrebbe funzionare un elenco globale di code (ovviamente, ogni processo dovrà quindi sapere quale indice nella lista da utilizzare, ma va bene passare come argomento;).

+0

Funzionerà se il "compito" è definito in un diverso modulo o pacchetto? Il codice di esempio è molto semplificato. Il vero programma ha un'architettura MVC in cui una pipeline produttore-consumatore viene impostata su più core (il modello) e deve inviare aggiornamenti di avanzamento alla GUI wxPython (la vista). – David

+2

@David, puoi provare; se il tuo codice reale non funziona in questo modo semplice, avrai bisogno di spostarti di una tacca nella complessità e andare per un Manager (che può darti dei proxy alle code, ecc.). –

+0

Questo non sembra funzionare affatto. q non restituisce mai nulla q.empty() è sempre True sulla mia macchina. Anche se aumento la chiamata a riposo a 10 secondi, il che dovrebbe essere un tempo eccessivo per l'attività di mettere alcuni messaggi in coda, q.empty restituisce sempre True. – David

Problemi correlati