2011-12-15 14 views
10

sto usando multiprocessing.Pool()multiprocessing.pool.map e la funzione con due argomenti

ecco quello che voglio Piscina:

def insert_and_process(file_to_process,db): 
    db = DAL("path_to_mysql" + db) 
    #Table Definations 
    db.table.insert(**parse_file(file_to_process)) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 
    P.map(insert_and_process,file_list,db) # here having problem. 

voglio passare 2 argomenti Quello che voglio fare è quello di inizializza solo 4 connessioni DB (qui cercherò di creare una connessione su ogni chiamata di funzione quindi possibilmente milioni di esse e causerà la morte di IO Freezed). se posso creare 4 connessioni db e 1 per ogni processo sarà ok.

Esiste una soluzione per Pool? o dovrei abbandonarlo?

EDIT:

Da aiuto di tutti e due ho ottenuto questo in questo modo:

args=zip(f,cycle(dbs)) 
Out[-]: 
[('f1', 'db1'), 
('f2', 'db2'), 
('f3', 'db3'), 
('f4', 'db4'), 
('f5', 'db1'), 
('f6', 'db2'), 
('f7', 'db3'), 
('f8', 'db4'), 
('f9', 'db1'), 
('f10', 'db2'), 
('f11', 'db3'), 
('f12', 'db4')] 

Così qui come funzionera ', io gonna spostare DB codice di connessione verso il livello principale e fai questo:

def process_and_insert(args): 

    #Table Definations 
    args[1].table.insert(**parse_file(args[0])) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)] 
    args=zip(file_list,cycle(dbs)) 
    P.map(insert_and_process,args) # here having problem. 

Sì, ho intenzione di testarlo e farvelo sapere.

risposta

26

la documentazione Pool non dice di un modo di passare più di un parametro alla funzione target - ho provato solo di passaggio una sequenza, ma non ottiene spiegato (un elemento della sequenza per ogni parametro).

Tuttavia, è possibile scrivere la funzione target aspettarsi che il primo (e unico) il parametro di essere una tupla, in cui ogni elemento è uno dei parametri che ti aspetti:

from itertools import repeat 

def insert_and_process((file_to_process,db)): 
    db = DAL("path_to_mysql" + db) 
    #Table Definations 
    db.table.insert(**parse_file(file_to_process)) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 
    P.map(insert_and_process,zip(file_list,repeat(db))) 

(nota l'extra parentesi nella definizione di insert_and_process - python considera questo come un singolo parametro che dovrebbe essere una sequenza di 2 elementi. Il primo elemento della sequenza è attribuito alla prima variabile e l'altro al secondo)

+4

Si noti che la sintassi 'def f ((arg1, arg2)):' è scomparsa in Python 3. –

+1

@FerdinandBeyer: L'avevo dimenticato. Bene, a meno che l'implementazione di multiprocessing.Pool.map sia diversa lì, la strada da percorrere sarebbe quella di assegnare a un singolo argomento e decomprimerlo all'interno della funzione. – jsbueno

+0

Grazie ho funzionato! ho capito facendo zip (file_list, ciclo (dbs)). Ma io non uso f ((arg1, arg2)). come ho usato più del tuo codice ti ho selezionato! –

8

Il pool genera quattro processi, ciascuno eseguito dalla propria istanza dell'interprete Python. È possibile utilizzare una variabile globale per tenere il vostro oggetto di connessione al database, in modo che esattamente un collegamento viene creato per ogni processo:

global_db = None 

def insert_and_process(file_to_process, db): 
    global global_db 
    if global_db is None: 
     # If this is the first time this function is called within this 
     # process, create a new connection. Otherwise, the global variable 
     # already holds a connection established by a former call. 
     global_db = DAL("path_to_mysql" + db) 
    global_db.table.insert(**parse_file(file_to_process)) 
    return True 

Dal Pool.map() e gli amici supportano solo le funzioni di lavoratori di un argomento, è necessario creare un wrapper che in avanti il lavoro:

def insert_and_process_helper(args): 
    return insert_and_process(*args) 

if __name__ == "__main__": 
    file_list=os.listdir(".") 
    db = "wherever you get your db" 
    # Create argument tuples for each function call: 
    jobs = [(file, db) for file in file_list] 
    P = Pool(processes=4) 
    P.map(insert_and_process_helper, jobs) 
+0

Grazie Ferdinand, questo è vicino a quello che voglio. Quello che voglio fare è creare 4 connessioni DB. Una connessione per ogni processo, ma non per tutte le chiamate di funzione. 'DAL (" Path To db ")' creerà una connessione db. La singola connessione sarà più lenta delle connessioni Quad contemporaneamente. –

+0

Ho provato quegli esempi e ha funzionato correttamente quando la funzione non deve tornare ...; Non possiamo fare qualcosa come my_var = P.map (insert_and_process_helper, jobs)? – neverMind

+0

@neverMind ovviamente puoi –

5

Non c'è bisogno di usa zip.Se per esempio si dispone di 2 parametri, x ed y, e ciascuno di essi possono ottenere diversi valori, come:

X=range(1,6) 
Y=range(10) 

La funzione dovrebbe avere un solo parametro, ed estrarlo all'interno:

def func(params): 
    (x,y)=params 
    ... 

e si chiama così:

params = [(x,y) for x in X for y in Y] 
pool.map(func, params) 
2

Utilizzando

params=[(x,y) for x in X for y in Y] 

si crea una copia completa di x e y, e che può essere più lento rispetto all'utilizzo di

from itertools import repeat 
P.map(insert_and_process,zip(file_list,repeat(db))) 
1

È possibile utilizzare

from functools import partial 

libreria per questo scopo

come

func = partial(rdc, lat, lng) 
r = pool.map(func, range(8)) 

e

def rdc(lat,lng,x): 
    pass 
Problemi correlati