2012-04-01 23 views
19

Sto usando il modulo multiprocessing di Python per elaborare array numpy di grandi dimensioni in parallelo. Gli array sono mappati in memoria usando numpy.load(mmap_mode='r') nel processo master. Successivamente, multiprocessing.Pool() avvia il processo (presumo).NumPy vs. multiprocessing e mmap

Tutto sembra funzionare bene, tranne io sono linee ottenendo come:

AttributeError ("oggetto 'NoneType' non ha alcun attributo 'dire'",) in <bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)> ignorato

nel registri di unittest. I test passano bene, tuttavia.

Qualche idea di cosa sta succedendo lì?

Utilizzo di Python 2.7.2, OS X, NumPy 1.6.1.


UPDATE:

Dopo debugging, ho braccati la causa di un percorso di codice che utilizzava un (piccolo fetta di) questo array NumPy mappato in memoria come input per una chiamata Pool.imap.

Apparentemente il "problema" riguarda il modo in cui multiprocessing.Pool.imap passa il suo input ai nuovi processi: utilizza pickle. Questo non funziona con gli array numpy e mmap e qualcosa all'interno di interruzioni che porta all'errore.

Ho trovato this reply di Robert Kern che sembra affrontare lo stesso problema. Suggerisce di creare un percorso di codice speciale per quando l'input imap proviene da un array mappato in memoria: la mappatura della memoria viene eseguita manualmente nel processo generato.

Questo sarebbe così complicato e brutto che preferirei vivere con l'errore e le copie di memoria extra. C'è un altro modo che sarebbe più leggero sulla modifica del codice esistente?

risposta

22

Il mio approccio abituale (se si può vivere con copie di memoria extra) è fare tutto IO in un processo e quindi inviare le cose a un pool di thread di lavoro. Per caricare una porzione di un array memmapped nella memoria basta fare x = np.array(data[yourslice]) (data[yourslice].copy() in realtà non farlo, il che può portare ad una certa confusione.).

Prima di tutto, cerchiamo di generare alcuni dati di test:

import numpy as np 
np.random.random(10000).tofile('data.dat') 

È possibile riprodurre i vostri errori con qualcosa di simile:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield data[start:stop] 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

E se si passa a cedere np.array(data[start:stop]), invece, ti correggi il problema:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield np.array(data[start:stop]) 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

Naturalmente, questo fa un extra copia in memoria di ogni blocco.

A lungo termine, probabilmente scoprirai che è più semplice passare da file memmapped e passare a qualcosa come HDF. Questo è particolarmente vero se i tuoi dati sono multidimensionali. (Lo raccomando h5py, ma pyTables è bello se i dati sono "simili a tabelle".)

Buona fortuna, in ogni caso!

+0

Joe le tue risposte sempre rock. Ho appena provato a immaginare qualcosa di simile. – YXD

+0

Grazie per il suggerimento HDF. Sembra un grande cambiamento ma potrebbe valerne la pena, lo controllerò. – user124114