penso che la soluzione migliore è quella di utilizzare due piscine qui:
from multiprocessing import Pool
# import parsers here
parsers = {
'parser1': parser1.process,
'parser2': parser2.process,
'parser3': parser3.process,
'parser4': parser4.process,
'parser5': parser5.process,
'parser6': parser6.process,
'parser7': parser7.process,
}
# Sets that define which items can use high parallelism,
# and which must use low
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"}
low_par = {"parser2", "parser5"}
def process_items(key, value):
parsers[key](value)
def run_pool(func, items, num_items, check_set):
pool = Pool(num_items)
out = pool.map(func, (item for item in items if item[0] in check_set))
pool.close()
pool.join()
return out
if __name__ == "__main__":
items = [('parser2', x), ...] # Your list of tuples
# Process with high parallelism
high_results = run_pool(process_items, items, 4, high_par)
# Process with low parallelism
low_results = run_pool(process_items, items, 2, low_par)
Cercando di fare questo in un Pool
è possibile, attraverso l'uso intelligente delle primitive di sincronizzazione, ma non credo che finirà per sembrare molto più pulito di questo. Potrebbe anche finire per funzionare in modo meno efficiente, poiché a volte il pool deve attendere il completamento del lavoro, in modo che possa elaborare un oggetto di parallelismo basso, anche quando gli elementi di parallelismo elevato sono disponibili dietro di esso nella coda.
Questo sarebbe complicarsi un po 'se avevi bisogno di ottenere i risultati di ogni process_items
chiamata nello stesso ordine in cui sono caduto nel iterabile originale, vale a dire i risultati di ogni Pool
necessità di ottenere fuse, ma sulla base di esempio ho non pensare che sia un requisito Fammi sapere se lo è, e cercherò di adattare la mia risposta di conseguenza.
Potresti usare solo due piscine? Innanzitutto, crea un pool che utilizza tutti i core, esegui iterazioni sull'elenco, filtra le voci che richiedono un parallelismo ridotto e chiama 'pool1.map' sugli elementi rimanenti. Quindi chiudi/entra in quella piscina. Quindi crea un nuovo pool con meno processi e chiama 'map' solo sulle voci nel iterable che * do * necessitano di un parallelismo ridotto. – dano
Questa era l'unica opzione che potevo pensare, speravo che potesse esserci un modo più pulito? O forse questo è abbastanza pulito? –
Le altre opzioni a cui posso pensare sono probabilmente * meno * pulite - avresti bisogno di una sorta di sincronizzazione tra tutti i tuoi dipendenti per gestirla con un solo 'pool'. E dovrai anche occuparti dei casi in cui alcuni lavoratori sono impegnati a lavorare quando arrivi a elementi che necessitano di un parallelismo ridotto, il che significa che dovrai attendere fino a quando non saranno terminati altri lavoratori prima di elaborarli. Sembra che finirebbe a essere disordinato per avere ragione. – dano