2010-12-01 20 views
6

chiunque può aiutarmi con la condivisione di una lista tra più processi python. Il problema è ottenere self.ID_List e self.mps_in_process che funzionano nel seguente codice.utilizzando la lista con multiprocessing di python

import time, random 
from multiprocessing import Process #, Manager, Array, Queue 

class MP_Stuff(): 
    def __init__(self, parent, id): 
     time.sleep(1 + random.random()*10) # simulate data processing 
     parent.killMP(id) 

class ParamHandler(): 
    def doFirstMP(self, IDs): 
     self.mps_in_process = [] 
     self.ID_List = IDs 
     id = self.ID_List.pop(0) 
     p = Process(target=MP_Stuff, args=(self, id)) 
     self.mps_in_process.append(id) 
     p.start() 

    def doMP(self): 
     for tmp in range(3): # nr of concurrent processes 
      if len(self.ID_List) > 0: 
       id = self.ID_List.pop(0) 
       p = Process(target=MP_Stuff, args=(self, id)) 
       self.mps_in_process.append(id) 
       p.start() 

    def killMP(self, kill_id): 
     self.mps_in_process.remove(kill_id) 
     self.doMP() 

if __name__ == '__main__': 
    ID_List = [1,2,3,4,5,6] 
    paramSet = ParamHandler() 
    paramSet.doFirstMP(ID_List)

Molto brevemente, la funzione del codice, è che alcuni dati (qui, il tempo casuale in MP_Stuff) sono trattati secondo id dati in self.ID_List. Per sapere quanti ID dei dati sono in elaborazione viene utilizzato self.mps_in_process (qui i processi di nr sono hardcoded, ma in realtà è dinamico).

Il problema è condividere mps_in_process e ID_List tra più processi. Il codice attuale va in loop praticamente infinito. Ciò che va storto è in realtà ben descritto nella libreria multiprocessing:

"se il codice eseguito in un processo figlio tenta di accedere a una variabile globale, il valore che esso vede (se presente) potrebbe non essere lo stesso del valore in il processo genitore nel momento in cui è stato chiamato Process.start(). "

Tuttavia, non sono in grado di capire come far funzionare mps_in_process e ID_List. Non riesco ad usare la coda, dato che il modo in cui gli elementi sono estratti da mps_in_process è casuale. Non riesco ad usare la matrice, perché .pop (0) non funziona. Non riesco a utilizzare Manager(). List(), perché .remove() e len (ID_List) non funzionano quindi. L'uso del threading invece del multiprocessing non è una soluzione, poiché è necessario utilizzare freeze_support() in seguito.

Pertanto, qualsiasi aiuto su come condividere la lista tra i processi è molto gradito!

risposta

3

Il gestore funziona correttamente (incluso len()). Il problema con il tuo codice è che nel tuo processo principale, non aspetti fino al termine dell'elaborazione, quindi il processo principale termina e il gestore non è più accessibile. Inoltre non conosco l'atomicità del pop di ListProxy, quindi forse un blocco sarebbe utile.

La soluzione è p.join().

Tuttavia, sono confuso perché è sufficiente fare p.join alla fine di doFirstMP. Sarei felice se qualcuno potesse spiegare perché partecipare al primo p ritorna dopo che tutto il calcolo è stato fatto e non dopo i primi ritorni di doMP.

Il mio codice:

import time, random 
from multiprocessing import Process, Manager 

class MP_Stuff(): 
    def __init__(self, parent, id): 
     time.sleep(1 + random.random()*5) # simulate data processing 
     print id , "done" 
     parent.killMP(id) 

class ParamHandler():  
    def doFirstMP(self, IDs): 
     self.mps_in_process = [] 
     self.ID_List = Manager().list(IDs) 
     id = self.ID_List.pop(0) 
     p = Process(target=MP_Stuff, args=(self, id)) 
     self.mps_in_process.append(id) 
     p.start() 
     p.join() 
     print "joined" 

    def doMP(self): 
     for tmp in range(3): # nr of concurrent processes 
      print self.ID_List 
      if len(self.ID_List) > 0: 
       id = self.ID_List.pop(0) 
       p = Process(target=MP_Stuff, args=(self, id)) 
       self.mps_in_process.append(id) 
       p.start() 

    def killMP(self, kill_id): 
     print "kill", kill_id 
     self.mps_in_process.remove(kill_id) 
     self.doMP() 

if __name__ == '__main__': 
    ID_List = [1,2,3,4,5,6] 
    paramSet = ParamHandler() 
    paramSet.doFirstMP(ID_List) 
+0

grazie, funziona. Penso che p.join alla fine di doFirstMP sia sufficiente, perché tutti gli altri sottoprocessi vengono generati da questo (chiamata a doMP alla fine di killMP). Ora anche __main__ non finisce, prima che p.join sia fatto. In realtà, p.join non dovrebbe essere chiamato in doMP, poiché i sottoprocessi non sarebbero concomitanti. – bitman

+0

Certo, ma potresti avere un pool di processi (o solo un elenco di essi) e unirti a esso.Quello che non capisco è ciò che blocca la chiusura del primo processo generato, dato che doMP dovrebbe semplicemente creare i processi e ritornare immediatamente. – Krab

0

Sfortunatamente hai già specificato le opzioni.

Entrambi Array() e Manager().list() dovrebbero essere in grado di farlo, anche se potrebbe essere necessario un piccolo lavoro extra.

  • è possibile emulare un len(ID_List) memorizzando l'importo in un Value() e l'incremento/decremento di esso.
  • Il remove() può essere facilmente emulato con un loop e un delete dopo di esso (anche se più lento del corso).
Problemi correlati