2012-02-13 17 views
7

Solo sperimentazione e apprendimento, e so come creare un dizionario condiviso a cui è possibile accedere con più processi, ma non sono sicuro di come mantenere sincronizzato il dict. defaultdict, credo, illustra il problema che sto avendo.Utilizzo di defaultdict con multiprocessing?

from collections import defaultdict 
from multiprocessing import Pool, Manager, Process 

#test without multiprocessing 
s = 'mississippi' 
d = defaultdict(int) 
for k in s: 
    d[k] += 1 

print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)] 
print '*'*10, ' with multiprocessing ', '*'*10 

def test(k, multi_dict): 
    multi_dict[k] += 1 

if __name__ == '__main__': 
    pool = Pool(processes=4) 
    mgr = Manager() 
    multi_d = mgr.dict() 
    for k in s: 
     pool.apply_async(test, (k, multi_d)) 

    # Mark pool as closed -- no more tasks can be added. 
    pool.close() 

    # Wait for tasks to exit 
    pool.join() 

    # Output results 
    print multi_d.items() #FAIL 

print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10 
def test2(k, multi_dict2): 
    multi_dict2[k] += 1 


if __name__ == '__main__': 
    manager = Manager() 

    multi_d2 = manager.dict() 
    for k in s: 
     p = Process(target=test2, args=(k, multi_d2)) 
    p.start() 
    p.join() 

    print multi_d2 #FAIL 

Il primo risultato funziona (perché la sua non utilizzando multiprocessing), ma sto avendo problemi a farla funzionare con multiprocessing. Non sono sicuro di come risolverlo, ma penso che potrebbe esserci perché non è stato sincronizzato (e unire i risultati più tardi) o forse perché entro multiprocessing non riesco a capire come impostare defaultdict(int) nel dizionario.

Qualsiasi aiuto o suggerimento su come farlo funzionare sarebbe fantastico!

risposta

10

È possibile creare una sottoclasse BaseManager e registrare ulteriori tipi per la condivisione. È necessario fornire un tipo di proxy adatto nei casi in cui il tipo predefinito AutoProxy non funzioni. Per defaultdict, se è necessario accedere solo agli attributi già presenti in dict, è possibile utilizzare DictProxy.

from multiprocessing import Pool 
from multiprocessing.managers import BaseManager, DictProxy 
from collections import defaultdict 

class MyManager(BaseManager): 
    pass 

MyManager.register('defaultdict', defaultdict, DictProxy) 

def test(k, multi_dict): 
    multi_dict[k] += 1 

if __name__ == '__main__': 
    pool = Pool(processes=4) 
    mgr = MyManager() 
    mgr.start() 
    multi_d = mgr.defaultdict(int) 
    for k in 'mississippi': 
     pool.apply_async(test, (k, multi_d)) 
    pool.close() 
    pool.join() 
    print multi_d.items() 
+1

Wow, ha funzionato, grazie. Non capisco le tue modifiche, qual è lo scopo della classe MyManager (BaseManager)? – Lostsoul

+0

@Lostsoul È [il modo documentato] (http://docs.python.org/library/multiprocessing.html#customized-managers) per aggiungere il supporto per la condivisione di altri tipi rispetto a quello supportato da Manager. –

+0

Grazie mille, lo studierò! – Lostsoul

2

Bene, la classe Manager sembra fornire solo un numero fisso di strutture di dati predefinite che possono essere condivise tra i processi e defaultdict non è tra questi. Se davvero solo bisogno che uno defaultdict, la soluzione più semplice sarebbe quella di implementare il comportamento inadempiente sul proprio:

def test(k, multi_dict): 
    if k not in multi_dict: 
     multi_dict[k] = 0 
    multi_dict[k] += 1