2012-06-15 17 views
9

Quando si esegue un numero elevato di attività (con parametri di grandi dimensioni) utilizzando Pool.apply_async, i processi vengono allocati e passano allo stato di attesa e non vi è alcun limite per il numero di processi in attesa. Questo può finire mangiando tutta la memoria, come nell'esempio qui sotto:Multiprocessing di Python: come limitare il numero di processi in attesa?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

Sto cercando un modo per limitare la coda di attesa, in modo tale che non v'è solo un numero limitato di processi in attesa, e Pool.apply_async è bloccato mentre la coda di attesa è piena.

+0

Bell'esempio (+1). – mgilson

risposta

6

multiprocessing.Pool ha un _taskqueue membro di tipo multiprocessing.Queue, che accetta un parametro opzionale maxsize; sfortunatamente lo costruisce senza il set di parametri maxsize.

io consiglierei di sottoclassi multiprocessing.Pool con un copia-incolla di multiprocessing.Pool.__init__ che passa maxsize-_taskqueue costruttore.

scimmia-patching l'oggetto (o la piscina o la coda) sarebbe anche funzionare, ma dovreste monkeypatch pool._taskqueue._maxsize e pool._taskqueue._sem quindi sarebbe molto fragili:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

Sto usando Python 2.7.3 e il _taskqueue è di tipo Queue.Queue. Significa che è una coda semplice, e non una multiprocessing.Queue. Subclassing multiprocessing.Pool e overriding __init__ funziona bene, ma la patch delle scimmie non funziona come previsto. Tuttavia, questo è l'hack che stavo cercando, grazie. –

0

si potrebbe aggiungere in coda esplicito con parametro maxsize e utilizzare queue.put() invece di pool.apply_async() in questo caso. Poi i processi di lavoro potrebbe:

for a, b in iter(queue.get, sentinel): 
    # process it 

Se si vuole limitare il numero di argomenti di input creato/i risultati che sono nella memoria a circa il numero di processi di lavoro attivi allora si potrebbe utilizzare pool.imap*() metodi:

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

L'uso di 'imap' non fa differenza. La coda di input è ancora illimitata e l'utilizzo di questa soluzione finirà per consumare tutta la memoria. – Radim

+0

@Radim: il codice 'imap' nella risposta funziona anche se gli dai un generatore infinito. – jfs

+0

Non in Python 2, sfortunatamente (non ho guardato il codice in py3). Per un po 'di lavoro, vedi [questa risposta SO] (http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration). – Radim

1

aspettare se pool._taskqueue superi la dimensione desiderata:

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

Voglio solo aggiungere che ho trovato che questa è la soluzione più semplice ai miei problemi di memoria con multiprocessing. Ho usato max_apply_size = 10 e questo funziona bene per il mio problema, che è una lenta conversione di file. L'uso di un semaforo come @ecatmur suggerisce sembra una soluzione più solida ma potrebbe essere eccessivo per gli script semplici. – Nate

Problemi correlati