2013-09-28 10 views
7

Sto cercando di implementare questo multiprocessing tutorial per i miei scopi. All'inizio ho pensato che non fosse scalabile, ma quando ho fatto un esempio riproducibile ho scoperto che se l'elenco degli oggetti supera 124, sembra non restituire mai una risposta. A x = 124 funziona in .4 secondi, ma quando lo imposto su x = 125 non finisce mai. Sto eseguendo Python 2.7 su Windows 7.Elaborazione multipla Python> = 125 lista non finisce mai

from multiprocessing import Lock, Process, Queue, current_process 
import time 

class Testclass(object): 
    def __init__(self, x): 
     self.x = x 

def toyfunction(testclass): 
    testclass.product = testclass.x * testclass.x 
    return testclass 


def worker(work_queue, done_queue): 
    try: 
     for testclass in iter(work_queue.get, 'STOP'): 
      print(testclass.counter) 
      newtestclass = toyfunction(testclass) 
      done_queue.put(newtestclass) 

    except: 
     print('error') 

    return True 

def main(x): 

    counter = 1 

    database = [] 
    while counter <= x: 
     database.append(Testclass(10)) 
     counter += 1 
     print(counter) 



    workers = 8 
    work_queue = Queue() 
    done_queue = Queue() 
    processes = [] 

    start = time.clock() 
    counter = 1 

    for testclass in database: 
     testclass.counter = counter 
     work_queue.put(testclass) 
     counter += 1 
     print(counter) 


    print('items loaded') 
    for w in range(workers): 
     p = Process(target=worker, args=(work_queue, done_queue)) 
     p.start() 
     processes.append(p) 
     work_queue.put('STOP') 

    for p in processes: 
     p.join() 

    done_queue.put('STOP') 

    newdatabase = [] 
    for testclass in iter(done_queue.get, 'STOP'): 
     newdatabase.append(testclass) 

    print(time.clock()-start) 
    print("Done") 
    return(newdatabase) 

if __name__ == '__main__': 
    database = main(124) 
    database2 = main(125) 
+0

Funziona per me. – Veedrac

+0

ha pubblicato un'altra domanda qui che ha funzionato per me. http://stackoverflow.com/questions/19070638/python-multiprocessing-ioerror-errno-232-the-pipe-is-being-closed#comment28188856_19070638 –

+0

Ho risolto il problema nell'altro post. Ho provato a eseguire il codice sopra su entrambi i miei computer e su entrambi si blocca alla fine quando 'x = 125' e non stampa mai Fatto. – Michael

risposta

6

OK! Da the docs:

Attenzione Come accennato in precedenza, se un processo figlio ha messo gli elementi su una coda (e non ha usato JoinableQueue.cancel_join_thread), allora questo processo non terminerà fino a quando tutti gli elementi tamponati sono stati lavati alla pipa. Ciò significa che se provi ad unirti a quel processo potresti ottenere un deadlock a meno che non sei sicuro di che tutti gli elementi che sono stati messi in coda siano stati consumati. Allo stesso modo, se il processo figlio è non-demonico, il processo genitore potrebbe bloccarsi all'uscita quando tenta di di unirsi a tutti i suoi figli non demoniaci. Si noti che una coda creata utilizzando un manager non ha questo problema. Vedi le linee guida sulla programmazione.

Come ho sottolineato in un commento precedente, il codice tenta di .join() processi prima il done_queue coda viene scaricata - e che dopo aver cambiato il codice in modo funky per essere sicuri done_queue fu prosciugato prima .join() 'ing, il codice funzionava bene per un milione di articoli.

Quindi questo è un caso di errore del pilota, sebbene piuttosto oscuro. Per quanto riguarda il motivo per cui il comportamento dipende dal numero passato a main(x), è imprevedibile: dipende da come il buffering viene eseguito internamente. Tale divertimento ;-)

+0

Raccomandi un particolare lavoro in giro per il codice sopra? C'è un altro modo in cui le persone di solito ottengono il prodotto finito dalla coda? – Michael

+3

Ho già spiegato in un commento come ho "corretto" questo: Modifica 'worker()' per fare 'done_queue.put ('STOP')' prima che finisca. Cambia l'iterazione del programma principale su 'done_queue' per contare il numero di volte che vede' 'STOP'' - è fatto quando vede 'lavoratori' di loro. Sposta il '.join()' s ** dopo ** l'iterazione su 'done_queue' è finita. –

Problemi correlati