2015-07-02 28 views
5

Sto usando python 2.7.10. Ho letto molti file, li ho archiviati in una grande lista, poi ho provato a chiamare multiprocessing e passare la lista grande a quei multiprocessi in modo che ogni processo possa accedere a questa grande lista e fare qualche calcolo.multiprocessing di python, big data turn process in sleep

sto usando pool Ti piace questa:

def read_match_wrapper(args): 
    args2 = args[0] + (args[1],) 
    read_match(*args2) 

pool = multiprocessing.Pool(processes=10) 
result=pool.map(read_match_wrapper,itertools.izip(itertools.repeat((ped_list,chr_map,combined_id_to_id,chr)),range(10))) 
pool.close() 
pool.join() 

Fondamentalmente, io sto passando più variabili di funzione 'read_match'. Per utilizzare pool.map, scrivo la funzione 'read_match_wrapper'. Non ho bisogno di risultati da quei processi. Voglio solo che corrano e finiscano.

Posso far funzionare tutto questo processo quando la lista dei miei dati 'ped_list' è piuttosto piccola. Quando carico tutti i dati, come 10G, quindi tutti i multiprocessi generati mostrano "S" e sembra non funzionare affatto.

Non so se esiste un limite di quanti dati è possibile accedere attraverso la piscina? Ho davvero bisogno di aiuto su questo! Grazie!

+0

Qual è il codice di 'read_match'? –

+0

Potresti controllare la sezione 'itertools.izip()' ha restituito un valore corretto? Se i tuoi dati sono realmente 10G, con 'repeat()', potrebbe peggiorare e probabilmente consumare troppa memoria. – Jkm

+0

Sì, penso che la memoria sia il problema qui. Ogni volta che genera un processo, la memoria viene copiata. E il cluster non può permetterselo! – odeya

risposta

3

Dalle linee guida di programmazione multiprocessing:

Evitare stato condiviso

As far as possible one should try to avoid shifting large amounts of data between processes. 

Cosa si soffre è un sintomo tipico di un tubo pieno, che non ottiene drenato.

Il multiprocessing Python. La pipa utilizzata dalla piscina presenta alcuni difetti di progettazione. Fondamentalmente implementa una sorta di protocollo orientato ai messaggi su una pipe OS che è più simile a un oggetto stream.

Il risultato è che, se si invia un oggetto troppo grande attraverso il tubo, questo si riempirà. Il mittente non sarà in grado di aggiungere contenuti e il ricevitore non sarà in grado di scaricarlo poiché è bloccato in attesa della fine del messaggio.

La prova è che i lavoratori stanno dormendo in attesa di quel messaggio "grasso" che non arriva mai.

È ped_list che contiene i nomi di file o il contenuto del file?

Nel secondo caso preferisci inviare i nomi dei file invece del contenuto. I lavoratori possono recuperare i contenuti stessi con un semplice open().

+0

ped_list contiene il contenuto del file, è una grande lista di liste ... – odeya

+0

L'ho notato da altri post, http://stackoverflow.com/questions/14124588/python-multiprocessing-shared-memory. Dovrei trasformare il mio ped_list in multiprocessing.Array. Ma non so come farlo dato che è un elenco di liste..multiprocessing.Array sembra prendere solo un formato molto semplice – odeya

+1

Indipendentemente dal metodo che utilizzerai per condividere la ped_list, sarà comunque molto lento e soffrirà da deadlock se non * molto * ben implementato. La vera soluzione del problema è ancora quella proposta. Invece di caricare il contenuto del file in ped_list, è sufficiente caricare i nomi dei file e lasciare che i child worker eseguano il caricamento del file stesso. Un altro problema esattamente uguale al tuo: http://stackoverflow.com/questions/27253666/python-multiprocessing-pool-got-stuck-after-long-execution/27757177#27757177 – noxdafox

0

Invece di lavorare con pool.map preferisco usare le code. Si potrebbe generare il numero desiderato di processi e assegnare una coda di ingresso:

n = 10 #number of processes 
tasks = multiprocessing.Queue() 

for i in range(n): #spawn processes 
    multiprocessing.Process(target = read_match_wrapper, args = tasks) 
for element in ped_list: 
    tasks.put(element) 

In questo modo, la coda è riempita da un lato e, allo stesso tempo svuotato dall'altro. Forse è necessario mettere qualcosa in coda prima che i processi vengano avviati. C'è una possibilità che finiscano senza fare nulla in quanto la coda è vuota o solleva un'eccezione Queue.empty.