2012-08-16 12 views
18

Vorrei eseguire più istanze di program.py contemporaneamente, limitando il numero di istanze in esecuzione contemporaneamente (ad esempio al numero di core CPU disponibili sul mio sistema). Ad esempio, se ho 10 core e devo fare 1000 run di program.py in totale, solo 10 istanze verranno create e funzionanti in un dato momento.Multiprocessing in Python mentre si limita il numero di processi in esecuzione

Ho provato a utilizzare il modulo multiprocessing, il multithreading e l'uso di code, ma non c'è nulla che mi sia sembrato utile per un'implementazione facile. Il più grande problema che ho è trovare un modo per limitare il numero di processi in esecuzione contemporaneamente. Questo è importante perché se creo 1000 processi contemporaneamente, diventa equivalente a un bombardamento a forcella. Non ho bisogno dei risultati restituiti dai processi a livello di codice (vengono inviati su disco) e i processi vengono eseguiti indipendentemente l'uno dall'altro.

Qualcuno può darmi suggerimenti o un esempio di come potrei implementarlo in python o anche bash? Pubblicheremo il codice che ho scritto finora usando le code, ma non funziona come previsto e potrebbe già essere ingiustificato.

Molte grazie.

+2

Avete provato [pool di processi Python] (http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool)? – C2H5OH

+0

Il modo più semplice per farlo consiste nel creare un programma "controller" che crea il 'multiprocessing.pool' e genera i thread worker (program.py), riallocando il lavoro al termine dell'istanza. – jozzas

+0

Grazie, ci proverò; nel mio primo tentativo, per qualche motivo, sono giunto alla conclusione che multiprocessing.pool non era quello che volevo, ma ora sembra giusto. Quindi in questo caso, i thread di lavoro generano appena program.py (come thread? Con subprocess.Popen)? Potresti per favore pubblicare un esempio approssimativo o implementazione di modelli che potrei seguire? – steadfast

risposta

2

script Bash invece di Python, ma lo uso spesso per semplice elaborazione parallela:

#!/usr/bin/env bash 
waitForNProcs() 
{ 
nprocs=$(pgrep -f $procName | wc -l) 
while [ $nprocs -gt $MAXPROCS ]; do 
    sleep $SLEEPTIME 
    nprocs=$(pgrep -f $procName | wc -l) 
done 
} 
SLEEPTIME=3 
MAXPROCS=10 
procName=myPython.py 
for file in ./data/*.txt; do 
waitForNProcs 
./$procName $file & 
done 

O per casi molto semplici, un'altra opzione è xargs dove P Imposta il numero di procs

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
3

È necessario utilizzare un supervisore di processo. Un approccio sarebbe utilizzare l'API fornita da Circus per fare ciò "programmaticamente", il sito di documentazione è ora offline ma penso che sia solo un problema temporaneo, in ogni caso, è possibile utilizzare il circo per gestirlo. Un altro approccio sarebbe l'utilizzo del supervisord e l'impostazione del parametro numprocs del processo sul numero di core che avete.

Un esempio utilizzando Circus:

from circus import get_arbiter 

arbiter = get_arbiter("myprogram", numprocesses=3) 
try: 
    arbiter.start() 
finally: 
    arbiter.stop() 
21

So che ha detto che l'approccio Pool.map non ha molto senso per voi. La mappa è solo un modo semplice per dargli una fonte di lavoro e un callable da applicare a ciascuno degli elementi. Il func per la mappa potrebbe essere un qualsiasi punto di ingresso per eseguire il lavoro effettivo sull'arg dato.

Se questo non sembra giusto per te, ho una risposta piuttosto dettagliata qui sull'utilizzo di un modello produttore-consumatore: https://stackoverflow.com/a/11196615/496445

In sostanza, si crea una coda, e iniziare a N numero di lavoratori. Quindi si alimenta la coda dal thread principale o si crea un processo Producer che alimenta la coda. I lavoratori continuano a prendere il lavoro dalla coda e non ci sarà mai più un lavoro simultaneo in atto rispetto al numero di processi che hai avviato.

Hai anche la possibilità di porre un limite alla coda, in modo che blocchi il produttore quando c'è già troppo lavoro in sospeso, se hai bisogno di mettere vincoli anche sulla velocità e sulle risorse che il produttore consuma.

La funzione di lavoro che viene chiamato può fare tutto quello che vuoi. Questo può essere un wrapper attorno ad alcuni comandi di sistema, oppure può importare la tua lib python ed eseguire la routine principale. Esistono sistemi di gestione dei processi specifici che consentono di configurare configurazioni per eseguire i file eseguibili arbitrari con risorse limitate, ma questo è solo un approccio Python di base per farlo.

Frammenti da quella other answer mio:

base Pool:

from multiprocessing import Pool 

def do_work(val): 
    # could instantiate some other library class, 
    # call out to the file system, 
    # or do something simple right here. 
    return "FOO: %s" % val 

pool = Pool(4) 
work = get_work_args() 
results = pool.map(do_work, work) 

Utilizzando un gestore di processi e produttore

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 

     # exit signal 
     if item == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = item 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    # this could also be started in a producer process 
    # instead of blocking 
    iters = itertools.chain(get_work_args(), (None,)*num_workers) 
    for item in iters: 
     work.put(item) 

    for p in pool: 
     p.join() 

    print results 
+0

Ottimo esempio, l'ho migliorato ottenendo il numero di CPUS come spiegano in http://stackoverflow.com/questions/6905264/python-multiprocessing-utilizes-only-one-core e quindi ho potuto impostare dinamicamente num_workers in base a le CPU della macchina. –

0

Mentre ci sono molte risposte sull'utilizzo multiprocessing .pool, non ci sono molti frammenti di codice su h di usare multiprocessing.Process, che è davvero più utile quando l'utilizzo della memoria è importante. l'avvio di 1000 processi sovraccaricherà la CPU e ucciderà la memoria. Se ogni processo e le sue pipeline di dati sono ad uso intensivo della memoria, il sistema operativo o Python stesso limiterà il numero di processi paralleli. Ho sviluppato il seguente codice per limitare il numero simultaneo di lavori inoltrati alla CPU in lotti. La dimensione del batch può essere ridimensionata proporzionalmente al numero di core della CPU. Nel mio PC Windows, il numero di processi per batch può essere efficiente fino a 4 volte i corri della CPU disponibili.

import multiprocessing 
def func_to_be_multiprocessed(q,data): 
    q.put(('s')) 
q = multiprocessing.Queue() 
worker = [] 
for p in range(number_of_jobs): 
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ 
     args=(q,data)...)) 
num_cores = multiprocessing.cpu_count() 
Scaling_factor_batch_jobs = 3.0 
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs 
num_of_batches = number_of_jobs // num_jobs_per_batch 
for i_batch in range(num_of_batches): 
    floor_job = i_batch * num_jobs_per_batch 
    ceil_job = floor_job + num_jobs_per_batch 
    for p in worker[floor_job : ceil_job]: 
             worker.start() 
    for p in worker[floor_job : ceil_job]: 
             worker.join() 
for p in worker[ceil_job :]: 
          worker.start() 
for p in worker[ceil_job :]: 
          worker.join() 
for p in multiprocessing.active_children(): 
          p.terminate() 
result = [] 
for p in worker: 
    result.append(q.get()) 

L'unico problema è, se uno qualsiasi di lavoro in ogni partita non ha potuto completare e porta ad una situazione impiccagione, non sarà avviata resto dei lotti di lavori. Pertanto, la funzione da elaborare deve disporre di routine di gestione degli errori corrette.

Problemi correlati