2015-04-21 14 views
6

Ho uno script python che esegue un metodo in parallelo.Come modificare il numero di processi paralleli?

parsers = { 
    'parser1': parser1.process, 
    'parser2': parser2.process 
} 

def process((key, value)): 
    parsers[key](value) 

pool = Pool(4) 
pool.map(process_items, items) 

process_items è il mio metodo e items è una lista di tuple con due elementi ad ogni tupla. L'elenco items ha circa 100k elementi.

process_items chiamerà un metodo in base ai parametri forniti. Il mio problema è forse il 70% della lista che posso eseguire con un elevato parallelismo, ma l'altro 30% può funzionare solo con 1/2 thread altrimenti causerà un errore al di fuori del mio controllo.

Quindi nel mio codice ho circa 10 diversi processi parser. Per esempio 1-8 voglio correre con Pool (4) ma 9-10 Pool (2).

Qual è il modo migliore per ottimizzare questo?

+0

Potresti usare solo due piscine? Innanzitutto, crea un pool che utilizza tutti i core, esegui iterazioni sull'elenco, filtra le voci che richiedono un parallelismo ridotto e chiama 'pool1.map' sugli elementi rimanenti. Quindi chiudi/entra in quella piscina. Quindi crea un nuovo pool con meno processi e chiama 'map' solo sulle voci nel iterable che * do * necessitano di un parallelismo ridotto. – dano

+0

Questa era l'unica opzione che potevo pensare, speravo che potesse esserci un modo più pulito? O forse questo è abbastanza pulito? –

+0

Le altre opzioni a cui posso pensare sono probabilmente * meno * pulite - avresti bisogno di una sorta di sincronizzazione tra tutti i tuoi dipendenti per gestirla con un solo 'pool'. E dovrai anche occuparti dei casi in cui alcuni lavoratori sono impegnati a lavorare quando arrivi a elementi che necessitano di un parallelismo ridotto, il che significa che dovrai attendere fino a quando non saranno terminati altri lavoratori prima di elaborarli. Sembra che finirebbe a essere disordinato per avere ragione. – dano

risposta

2

penso che la soluzione migliore è quella di utilizzare due piscine qui:

from multiprocessing import Pool 
# import parsers here 

parsers = { 
    'parser1': parser1.process, 
    'parser2': parser2.process, 
    'parser3': parser3.process, 
    'parser4': parser4.process, 
    'parser5': parser5.process, 
    'parser6': parser6.process, 
    'parser7': parser7.process, 
} 

# Sets that define which items can use high parallelism, 
# and which must use low 
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"} 
low_par = {"parser2", "parser5"} 

def process_items(key, value): 
    parsers[key](value) 

def run_pool(func, items, num_items, check_set): 
    pool = Pool(num_items) 
    out = pool.map(func, (item for item in items if item[0] in check_set)) 
    pool.close() 
    pool.join() 
    return out 

if __name__ == "__main__": 
    items = [('parser2', x), ...] # Your list of tuples 
    # Process with high parallelism 
    high_results = run_pool(process_items, items, 4, high_par) 
    # Process with low parallelism 
    low_results = run_pool(process_items, items, 2, low_par) 

Cercando di fare questo in un Pool è possibile, attraverso l'uso intelligente delle primitive di sincronizzazione, ma non credo che finirà per sembrare molto più pulito di questo. Potrebbe anche finire per funzionare in modo meno efficiente, poiché a volte il pool deve attendere il completamento del lavoro, in modo che possa elaborare un oggetto di parallelismo basso, anche quando gli elementi di parallelismo elevato sono disponibili dietro di esso nella coda.

Questo sarebbe complicarsi un po 'se avevi bisogno di ottenere i risultati di ogni process_items chiamata nello stesso ordine in cui sono caduto nel iterabile originale, vale a dire i risultati di ogni Pool necessità di ottenere fuse, ma sulla base di esempio ho non pensare che sia un requisito Fammi sapere se lo è, e cercherò di adattare la mia risposta di conseguenza.

+0

Grazie di averlo provato. Questo è probabilmente il modo migliore per mantenere le cose semplici. –

1

È possibile specificare il numero di thread paralleli nel costruttore per multiprocessing.Pool:

from multiprocessing import Pool 

def f(x): 
    return x*x 

if __name__ == '__main__': 
    pool = Pool(5) # 5 is the number of parallel threads 
    print pool.map(f, [1, 2, 3]) 
+0

Scusa se ho aggiornato la mia domanda in quanto non mi sono spiegato correttamente. Ho già impostato i processi in modo che solo uno/due dei miei processi non possano funzionare in quel parallelismo e falliranno. –

Problemi correlati