Quello che state cercando è un modello produttore/consumatore
filettatura esempio di base
Ecco un esempio di base utilizzando il (invece di multiprocessing)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
È wouldn condividere l'oggetto file con i thread. Si produrrebbe lavoro per loro fornendo il queue con la linea di dati. Quindi ogni thread lo preleva e lo elabora, quindi lo restituisce in coda.
Ci sono alcuni servizi più avanzati incorporati nello multiprocessing module per condividere dati, come gli elenchi e . Ci sono dei compromessi nell'utilizzo dei thread multiprocessing vs e dipende dal fatto che il tuo lavoro sia legato alla cpu o legato all'IO.
base multiprocessing.Pool esempio
Ecco un esempio davvero di base di un multiprocessing Pool
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
A Pool è un oggetto convenienza che gestisce i propri processi. Poiché un file aperto può scorrere le sue linee, è possibile passarlo alla mappa, che si sovrapporrà ad esso e fornirà le linee alla funzione worker. Map blocchi e restituisce l'intero risultato al termine. Tieni presente che nell'esempio troppo semplice, lo map
consumerà il tuo file tutto in una volta prima di estrarre il lavoro. Quindi, fai attenzione se è più grande. Esistono modi più avanzati per progettare una configurazione produttore/consumatore.
manuale "pool" con limite e la linea di ri-ordinamento
Questo è un esempio manuale del Pool.map, ma invece di consumare un intero iterabile, è possibile impostare una dimensione della coda in modo che si stanno alimentando solo pezzo per pezzo il più velocemente possibile. Ho anche aggiunto i numeri di riga in modo da poterli rintracciare e ricorrere se lo si desidera in seguito.
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
Bella domanda. Ho anche avuto questo dubbio. Anche se sono andato con la possibilità di rompere il file in file più piccoli :) –