2012-06-25 15 views
55

Ho un unico grande file di testo nel quale voglio elaborare ogni riga (fare alcune operazioni) e memorizzarle in un database. Dal momento che un singolo programma semplice sta impiegando troppo tempo, voglio farlo attraverso più processi o thread. Ogni thread/processo dovrebbe leggere i dati DIVERENTI (linee diverse) da quel singolo file e fare alcune operazioni sulla loro parte di dati (linee) e inserirli nel database in modo che, alla fine, io abbia tutti i dati elaborati e il mio database viene scaricato con i dati di cui ho bisogno.Elaborazione di un singolo file da più processi in python

Ma non sono in grado di capire come avvicinarsi a questo.

+2

Bella domanda. Ho anche avuto questo dubbio. Anche se sono andato con la possibilità di rompere il file in file più piccoli :) –

risposta

70

Quello che state cercando è un modello produttore/consumatore

filettatura esempio di base

Ecco un esempio di base utilizzando il (invece di multiprocessing)

import threading 
import Queue 
import sys 

def do_work(in_queue, out_queue): 
    while True: 
     item = in_queue.get() 
     # process 
     result = item 
     out_queue.put(result) 
     in_queue.task_done() 

if __name__ == "__main__": 
    work = Queue.Queue() 
    results = Queue.Queue() 
    total = 20 

    # start for workers 
    for i in xrange(4): 
     t = threading.Thread(target=do_work, args=(work, results)) 
     t.daemon = True 
     t.start() 

    # produce data 
    for i in xrange(total): 
     work.put(i) 

    work.join() 

    # get the results 
    for i in xrange(total): 
     print results.get() 

    sys.exit() 

È wouldn condividere l'oggetto file con i thread. Si produrrebbe lavoro per loro fornendo il queue con la linea di dati. Quindi ogni thread lo preleva e lo elabora, quindi lo restituisce in coda.

Ci sono alcuni servizi più avanzati incorporati nello multiprocessing module per condividere dati, come gli elenchi e . Ci sono dei compromessi nell'utilizzo dei thread multiprocessing vs e dipende dal fatto che il tuo lavoro sia legato alla cpu o legato all'IO.

base multiprocessing.Pool esempio

Ecco un esempio davvero di base di un multiprocessing Pool

from multiprocessing import Pool 

def process_line(line): 
    return "FOO: %s" % line 

if __name__ == "__main__": 
    pool = Pool(4) 
    with open('file.txt') as source_file: 
     # chunk the work into batches of 4 lines at a time 
     results = pool.map(process_line, source_file, 4) 

    print results 

A Pool è un oggetto convenienza che gestisce i propri processi. Poiché un file aperto può scorrere le sue linee, è possibile passarlo alla mappa, che si sovrapporrà ad esso e fornirà le linee alla funzione worker. Map blocchi e restituisce l'intero risultato al termine. Tieni presente che nell'esempio troppo semplice, lo map consumerà il tuo file tutto in una volta prima di estrarre il lavoro. Quindi, fai attenzione se è più grande. Esistono modi più avanzati per progettare una configurazione produttore/consumatore.

manuale "pool" con limite e la linea di ri-ordinamento

Questo è un esempio manuale del Pool.map, ma invece di consumare un intero iterabile, è possibile impostare una dimensione della coda in modo che si stanno alimentando solo pezzo per pezzo il più velocemente possibile. Ho anche aggiunto i numeri di riga in modo da poterli rintracciare e ricorrere se lo si desidera in seguito.

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 
     line_no, line = item 

     # exit signal 
     if line == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = (line_no, line) 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    with open("source.txt") as f: 
     iters = itertools.chain(f, (None,)*num_workers) 
     for num_and_line in enumerate(iters): 
      work.put(num_and_line) 

    for p in pool: 
     p.join() 

    # get the results 
    # example: [(1, "foo"), (10, "bar"), (0, "start")] 
    print sorted(results) 
+0

sì, il file è più grande, circa 1 GB o giù di lì. Non so quanto tu intenda dire più grande, 1 GB per me è più grande. – pranavk

+0

Questo va bene. Sono sicuro che puoi prendere questi esempi ed estrapolarti per le tue esigenze. Il threading va bene così com'è. Il multiprocessore ha bisogno solo di una coda simile da alimentare. – jdi

+1

Questo è buono, ma cosa succede se l'elaborazione è vincolata all'I/O? In tal caso, il parallelismo potrebbe rallentare piuttosto che accelerarlo. Le ricerche all'interno di una singola traccia del disco sono molto più veloci rispetto alla ricerca intertrack e l'I/O in parallelo tende a introdurre ricerche intertrack in quello che altrimenti sarebbe un carico I/O sequenziale. Per trarre qualche vantaggio dall'I/O parallelo, a volte aiuta un po 'a usare un mirror RAID. – user1277476

-4

interrompe bene il singolo file grande in più file più piccoli e ciascuno di essi viene elaborato in thread separati.

+4

puoi mostrare un po 'di codice? – maq

+0

questo non è che l'OP vuole !! ma solo per un'idea ... non male. – DRPK

5

Ecco un esempio veramente stupido che ho cucinato fino:

import os.path 
import multiprocessing 

def newlinebefore(f,n): 
    f.seek(n) 
    c=f.read(1) 
    while c!='\n' and n > 0: 
     n-=1 
     f.seek(n) 
     c=f.read(1) 

    f.seek(n) 
    return n 

filename='gpdata.dat' #your filename goes here. 
fsize=os.path.getsize(filename) #size of file (in bytes) 

#break the file into 20 chunks for processing. 
nchunks=20 
initial_chunks=range(1,fsize,fsize/nchunks) 

#You could also do something like: 
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. 


with open(filename,'r') as f: 
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) 

end_byte=[i-1 for i in start_byte] [1:] + [None] 

def process_piece(filename,start,end): 
    with open(filename,'r') as f: 
     f.seek(start+1) 
     if(end is None): 
      text=f.read() 
     else: 
      nbytes=end-start+1 
      text=f.read(nbytes) 

    # process text here. createing some object to be returned 
    # You could wrap text into a StringIO object if you want to be able to 
    # read from it the way you would a file. 

    returnobj=text 
    return returnobj 

def wrapper(args): 
    return process_piece(*args) 

filename_repeated=[filename]*len(start_byte) 
args=zip(filename_repeated,start_byte,end_byte) 

pool=multiprocessing.Pool(4) 
result=pool.map(wrapper,args) 

#Now take your results and write them to the database. 
print "".join(result) #I just print it to make sure I get my file back ... 

La parte difficile è quello di fare in modo che abbiamo diviso il file su caratteri di nuova riga in modo da non perdere alcuna riga (o leggi solo linee parziali).Quindi, ogni processo legge che fa parte del file e restituisce un oggetto che può essere inserito nel database dal thread principale. Naturalmente, potrebbe anche essere necessario eseguire questa parte in blocchi in modo che non sia necessario conservare tutte le informazioni in memoria contemporaneamente. (questo è abbastanza facile: basta dividere la lista "args" in X pezzi e chiamare pool.map(wrapper,chunk) - Vedi here)

Problemi correlati