2013-03-29 14 views
6

Questo sembra un problema semplice ma non riesco a capirlo.Scrive i dati in un file hdf utilizzando il multiprocessing

Ho una simulazione che viene eseguita in un ciclo double for e scrive i risultati in un file HDF. Una versione semplice di questo programma è il seguente:

import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    for ii in a: 
     print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 
Simulation() 

Questo codice non esattamente quello che voglio, ma dal momento che il processo può richiedere un po 'di tempo a correre ho cercato di utilizzare il modulo multiprocessing e utilizzare il seguente codice:

import multiprocessing 
import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(ii): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 

if __name__ == '__main__': 
    jobs = [] 
    for ii in a: 
     p = multiprocessing.Process(target=Simulation, args=(ii,)) 
     jobs.append(p)  
     p.start() 

Questo tuttavia stampa solo l'ultima simulazione sul file HDF, in qualche modo sovrascrive tutti gli altri gruppi.

risposta

10

Ogni volta che si apre un file in modalità scrittura (w), viene creato un nuovo file, quindi il contenuto del file viene perso se già esiste. Solo l'ultimo handle di file può scrivere correttamente nel file. Anche se lo hai modificato in modalità accodamento, non dovresti provare a scrivere sullo stesso file da più processi: l'output sarà confuso se due processi tentano di scrivere allo stesso tempo.

Invece, hanno tutti i processi di lavoro messi in uscita in una coda, e hanno un unico processo dedicato(o un sottoprocesso o il processo principale) di gestire l'uscita dalla coda e scrivere il file:


import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 
sentinel = None 


def Simulation(inqueue, output): 
    for ii in iter(inqueue.get, sentinel): 
     output.put(('createGroup', ('/', 'A%s' % ii))) 
     for i in range(num_arrays): 
      output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 


def handle_output(output): 
    hdf = pt.openFile('simulation.h5', mode='w') 
    while True: 
     args = output.get() 
     if args: 
      method, args = args 
      getattr(hdf, method)(*args) 
     else: 
      break 
    hdf.close() 

if __name__ == '__main__': 
    output = mp.Queue() 
    inqueue = mp.Queue() 
    jobs = [] 
    proc = mp.Process(target=handle_output, args=(output,)) 
    proc.start() 
    for i in range(num_processes): 
     p = mp.Process(target=Simulation, args=(inqueue, output)) 
     jobs.append(p) 
     p.start() 
    for i in range(num_simulations): 
     inqueue.put(i) 
    for i in range(num_processes): 
     # Send the sentinal to tell Simulation to end 
     inqueue.put(sentinel) 
    for p in jobs: 
     p.join() 
    output.put(None) 
    proc.join() 

Per confronto, qui è una versione che utilizza mp.Pool:

import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 


def Simulation(ii): 
    result = [] 
    result.append(('createGroup', ('/', 'A%s' % ii))) 
    for i in range(num_arrays): 
     result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 
    return result 


def handle_output(result): 
    hdf = pt.openFile('simulation.h5', mode='a') 
    for args in result: 
     method, args = args 
     getattr(hdf, method)(*args) 
    hdf.close() 


if __name__ == '__main__': 
    # clear the file 
    hdf = pt.openFile('simulation.h5', mode='w') 
    hdf.close() 
    pool = mp.Pool(num_processes) 
    for i in range(num_simulations): 
     pool.apply_async(Simulation, (i,), callback=handle_output) 
    pool.close() 
    pool.join() 

Sembra più semplice vero? Tuttavia c'è una differenza significativa. Il codice originale utilizzato output.put per inviare argomenti a handle_output che era in esecuzione nel proprio sottoprocesso. handle_output prenderà args dalla coda output e gestirli immediatamente. Con il codice Pool sopra, Simulation accumula un intero gruppo di args in result e result non viene inviato a handle_output fino a dopo i ritorni Simulation.

Se Simulation impiega molto tempo, ci sarà un lungo periodo di attesa mentre non viene scritto nulla su simulation.h5.

+0

Come aggiunta a questa domanda ho usato il codice sopra con successo ma ora sto espandendo questa simulazione, il ciclo for definito da a = range (1000) e anche il ciclo for definito da b = range (100). Questo howerver si traduce in un ampio uso della mia memoria. Ho 8 CPU con 16 Gb di RAM ma quando eseguo il file (anche senza le vere simulazioni) il mio utilizzo della RAM va al 100% e il mio sistema si ferma. – user2143958

+0

Penso che sia necessario separare il numero di sottoprocessi dal numero di attività. Sembra che tu voglia 1000 compiti, ma probabilmente non 1000 sottoprocessi. Modificherò il post per suggerire un modo per farlo. – unutbu

+0

Sì, hai ragione, nell'esempio precedente per le iterazioni di grandi dimensioni è stata creata una quantità di subprocessi ugualmente grande che intasava tutta la memoria. Il file che hai modificato funziona perfettamente! Ma solo per chiarimenti, stavo anche sperimentando la funzione Pool() e questa funzione sembra funzionare abbastanza bene anche se diventa più difficile quando è necessario passare più di una variabile. Qual è la ragione principale per scegliere la funzione Process() sulla funzione Pool()? – user2143958

Problemi correlati