2013-08-24 22 views
18

Ecco il programma:Utilizzo della memoria continuano a crescere con multiprocessing.pool di Python

#!/usr/bin/python 

import multiprocessing 

def dummy_func(r): 
    pass 

def worker(): 
    pass 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes=16) 
    for index in range(0,100000): 
     pool.apply_async(worker, callback=dummy_func) 

    # clean up 
    pool.close() 
    pool.join() 

Ho trovato l'utilizzo della memoria (sia VIRT e RES) ha continuato a crescere fino ad close()/join(), c'è qualche soluzione per sbarazzarsi di questo? Ho provato maxtasksperchild con 2.7 ma non è stato d'aiuto neanche.

Ho un programma più complicato che chiama apply_async() ~ 6M volte, e al punto ~ 1.5M ho già ottenuto 6G + RES, per evitare tutti gli altri fattori, ho semplificato il programma alla versione precedente.

EDIT:

Si è rivelato questa versione funziona meglio, grazie per l'input di tutti:

#!/usr/bin/python 

import multiprocessing 

ready_list = [] 
def dummy_func(index): 
    global ready_list 
    ready_list.append(index) 

def worker(index): 
    return index 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes=16) 
    result = {} 
    for index in range(0,1000000): 
     result[index] = (pool.apply_async(worker, (index,), callback=dummy_func)) 
     for ready in ready_list: 
      result[ready].wait() 
      del result[ready] 
     ready_list = [] 

    # clean up 
    pool.close() 
    pool.join() 

non ho messo alcuna serratura lì come credo processo principale è a thread singolo (callback è più o meno come una cosa guidata dagli eventi per i documenti che ho letto).

Ho modificato il range dell'indice di v1 su 1.000.000, come su v2 e ho eseguito alcuni test: è strano per me v2 addirittura ~ 10% più veloce di v1 (33s vs 37s), forse v1 stava facendo troppi lavori di manutenzione interni. v2 è sicuramente un vincitore sull'utilizzo della memoria, non ha mai superato i 300M (VIRT) e 50M (RES), mentre la v1 era 370M/120M, la migliore era 330M/85M. Tutti i numeri erano solo 3 ~ 4 volte il test, solo come riferimento.

+1

Solo speculare qui, ma mettere in coda un milione di oggetti occupa spazio. Forse renderli utili aiuterà. I documenti non sono definitivi, ma l'[esempio] (http://pydoc.net/Python/multiprocessing/2.6.2.1/multiprocessing.examples.mp_pool/) (cerca il callback di Test) mostra il risultato apply_async in attesa, anche quando ci sono callback. L'attesa potrebbe essere necessaria per cancellare una coda di risultati. – tdelaney

+0

Quindi multiprocessing.pool potrebbe non essere lo strumento giusto per me, dato che il callback in realtà non fa i lavori di pulizia, è possibile effettuare la pulizia in callback? Il problema è che non posso aspettare dopo la chiamata apply_async() come in real world worker() richiede ~ 0,1 secondi per richiesta (diverse richieste HTTP). –

+1

Wild guess: 'apply_asynch' crea una istanza [' AsynchResult'] (http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult). Il 'Pool' probabilmente ha qualche riferimento a questi oggetti, dal momento che devono essere in grado di restituire il risultato quando il calcolo è terminato, ma nel tuo ciclo li stai semplicemente buttando via. Probabilmente dovresti chiamare 'get()' o 'wait()' sui risultati asynch ad un certo punto, magari usando l'argomento 'callback' di' apply_asynch'. – Bakuriu

risposta

6

Utilizzare map_async anziché apply_async per evitare un utilizzo eccessivo della memoria.

Per il primo esempio, modificare le seguenti due righe:

for index in range(0,100000): 
    pool.apply_async(worker, callback=dummy_func) 

a

pool.map_async(worker, range(100000), callback=dummy_func) 

Si finirà in un batter d'occhio prima di poter vedere il suo utilizzo della memoria in top. Cambia la lista in una più grande per vedere la differenza. Tuttavia, la nota map_async innanzitutto converte il iterable che si passa ad esso in un elenco per calcolarne la lunghezza se non ha il metodo __len__. Se hai un iteratore di un numero enorme di elementi, puoi utilizzare itertools.islice per elaborarli in blocchi più piccoli.

Ho avuto un problema di memoria in un programma di vita reale con molti più dati e alla fine ho trovato che il colpevole era apply_async.

P.S., per quanto riguarda l'utilizzo della memoria, i due esempi non hanno alcuna differenza evidente.

4

Ho un dataset di nuvole di punti 3D molto grande che sto elaborando. Ho provato a utilizzare il modulo multiprocessing per accelerare l'elaborazione, ma ho iniziato a eliminare gli errori di memoria. Dopo alcune ricerche e test ho determinato che stavo riempiendo la coda dei compiti da elaborare molto più rapidamente di quanto i sottoprocessi potessero svuotare. Sono sicuro di smorzare, o usare map_async o qualcosa che avrei potuto aggiustare il carico, ma non volevo apportare grossi cambiamenti alla logica circostante.

La soluzione stupida che ho trovato è controllare la lunghezza dello pool._cache in modo intermittente, e se la cache è troppo grande, attendere che la coda si svuoti.

Nel mio mainloop ho già avuto un contatore e un ticker di stato:

# Update status 
count += 1 
if count%10000 == 0: 
    sys.stdout.write('.') 
    if len(pool._cache) > 1e6: 
     print "waiting for cache to clear..." 
     last.wait() # Where last is assigned the latest ApplyResult 

Così ogni 10k inserimento nella piscina ho controllare se ci sono più di 1 milione di operazioni in coda (circa 1G di memoria utilizzata nella processo principale). Quando la coda è piena, aspetto solo l'ultimo lavoro inserito.

Ora il mio programma può funzionare per ore senza esaurire la memoria. Il processo principale si interrompe occasionalmente mentre i lavoratori continuano a elaborare i dati.

proposito il membro _cache è documentata l'esempio del modulo piscina multiprocessing:

# 
# Check there are no outstanding tasks 
# 

assert not pool._cache, 'cache = %r' % pool._cache 
15

ho avuto problemi di memoria recente, dal momento che stavo usando più volte la funzione di multiprocessing, in modo da evitare che i processi di deposizione delle uova, e lasciandoli in memoria.

Ecco la soluzione che sto usando ora:

def myParallelProcess(ahugearray) 
from multiprocessing import Pool 
from contextlib import closing 
with closing(Pool(15)) as p: 
    res = p.imap_unordered(simple_matching, ahugearray, 100) 
return res 

I ❤ con

+2

Questo ha risolto il mio problema dopo aver trascorso giorni su questo problema! Molte grazie! Stavo creando un pool all'interno di un ciclo, quindi ho finito per generare troppi processi, ognuno dei quali consumava così tanta memoria e non usciva mai. Ho solo bisogno di fare mypool.close() alla fine del ciclo – MohamedEzz

1

Penso che questo è simile a the question I posted, ma non sono sicuro di avere lo stesso ritardo. Il mio problema era che stavo producendo risultati dal pool di multiprocessing più velocemente di quanto li stessimo consumando, quindi si sono accumulati nella memoria. Per evitare ciò, ho utilizzato uno semaphore per limitare gli input nel pool in modo che non si allontanassero troppo dalle uscite che stavo consumando.

0

Basta creare il pool nel loop e chiuderlo alla fine del ciclo con pool.close().

Problemi correlati