2013-04-12 8 views
11

Ho letto alcune domande su SO riguardo la condivisione di array e sembra abbastanza semplice per array semplici ma sono bloccato cercando di farlo funzionare per l'array che ho.Rendere condiviso il mio array NumPy attraverso i processi

import numpy as np 
data=np.zeros(250,dtype='float32, (250000,2)float32') 

Ho provato a convertire questo in un array condiviso da cercando di fare in qualche modo mp.Array accettare il data, ho anche provato a generare la matrice come l'utilizzo ctypes come tale:

import multiprocessing as mp 
data=mp.Array('c_float, (250000)c_float',250) 

L'unico modo che sono riuscito a far funzionare il mio codice non passando i dati alla funzione ma passando una stringa codificata per essere decompressi/decodificati, questo tuttavia finirebbe con il chiamare in n (numero di stringhe) processi che sembrano ridondanti. La mia implementazione desiderata si basa sulla divisione dell'elenco di stringhe binarie in x (numero di processi) e passando questo chunk, data e un index ai processi che funzionano tranne che data viene modificato localmente, da qui la domanda su come renderlo condiviso, qualsiasi esempio di funzionamento con una matrice numpy personalizzata (annidata) sarebbe già di grande aiuto.

PS: Questa domanda è un follow-up da Python multi-processing

+1

Che cos'è 'mp'? Il modulo 'multiprocessing'? – delnan

+0

sì, lo aggiungerò alla domanda –

risposta

10

Nota che si può iniziare con una serie di DTYPE complesso:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32') 

e vederlo come un array di DTYPE omogenea:

In [5]: data2 = data.view('float32') 

e più tardi, riconvertirlo in DTYPE complesso:

In [7]: data3 = data2.view('float32, (250000,2)float32') 

La modifica del dtype è un'operazione molto rapida; non influenza i dati sottostanti, solo il modo in cui NumPy lo interpreta. Quindi cambiare il dtype è praticamente senza costi.

Quindi quello che hai letto sugli array con dtypes semplici (omogenei) può essere prontamente applicato al tuo complesso dtype con il trucco di cui sopra.


Il codice seguente prende in prestito molte idee da J.F. Sebastian's answer, here.

import numpy as np 
import multiprocessing as mp 
import contextlib 
import ctypes 
import struct 
import base64 


def decode(arg): 
    chunk, counter = arg 
    print len(chunk), counter 
    for x in chunk: 
     peak_counter = 0 
     data_buff = base64.b64decode(x) 
     buff_size = len(data_buff)/4 
     unpack_format = ">%dL" % buff_size 
     index = 0 
     for y in struct.unpack(unpack_format, data_buff): 
      buff1 = struct.pack("I", y) 
      buff2 = struct.unpack("f", buff1)[0] 
      with shared_arr.get_lock(): 
       data = tonumpyarray(shared_arr).view(
        [('f0', '<f4'), ('f1', '<f4', (250000, 2))]) 
       if (index % 2 == 0): 
        data[counter][1][peak_counter][0] = float(buff2) 
       else: 
        data[counter][1][peak_counter][1] = float(buff2) 
        peak_counter += 1 
      index += 1 
     counter += 1 


def pool_init(shared_arr_): 
    global shared_arr 
    shared_arr = shared_arr_ # must be inherited, not passed as an argument 


def tonumpyarray(mp_arr): 
    return np.frombuffer(mp_arr.get_obj()) 


def numpy_array(shared_arr, peaks): 
    """Fills the NumPy array 'data' with m/z-intensity values acquired 
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'. 

    The m/z values are assumed to be ordered without validating this 
    assumption. 

    Note: This function uses multi-processing 
    """ 
    processors = mp.cpu_count() 
    with contextlib.closing(mp.Pool(processes=processors, 
            initializer=pool_init, 
            initargs=(shared_arr,))) as pool: 
     chunk_size = int(len(peaks)/processors) 
     map_parameters = [] 
     for i in range(processors): 
      counter = i * chunk_size 
      # WARNING: I removed -1 from (i + 1)*chunk_size, since the right 
      # index is non-inclusive. 
      chunk = peaks[i*chunk_size : (i + 1)*chunk_size] 
      map_parameters.append((chunk, counter)) 
     pool.map(decode, map_parameters) 

if __name__ == '__main__': 
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250) 
    peaks = ... 
    numpy_array(shared_arr, peaks) 

Se è possibile garantire che i vari processi che eseguono le assegnazioni

if (index % 2 == 0): 
    data[counter][1][peak_counter][0] = float(buff2) 
else: 
    data[counter][1][peak_counter][1] = float(buff2) 

mai competere per alterare i dati nelle stesse posizioni, allora credo che si può effettivamente rinunciare utilizzando il blocco della

with shared_arr.get_lock(): 

ma io non digito il codice abbastanza bene da sapere per certo, quindi per essere al sicuro, ho incluso il blocco.

+0

È garantito che non vorranno mai accedere allo stesso "intervallo" di dati a causa del contatore che passo alla funzione (che viene calcolata facendo "i * chunk_size").Proverò la tua risposta domani mattina e molto probabilmente accetterò questa risposta. –

0
from multiprocessing import Process, Array 
import numpy as np 
import time 
import ctypes 

def fun(a): 
    a[0] = -a[0] 
    while 1: 
     time.sleep(2) 
     #print bytearray(a.get_obj()) 
     c=np.frombuffer(a.get_obj(),dtype=np.float32) 
     c.shape=3,3 
     print 'haha',c 


def main(): 
    a = np.random.rand(3,3).astype(np.float32) 
    a.shape=1*a.size 
    #a=np.array([[1,3,4],[4,5,6]]) 
    #b=bytearray(a) 
    h=Array(ctypes.c_float,a) 
    print "Originally,",h 

    # Create, start, and finish the child process 
    p = Process(target=fun, args=(h,)) 
    p.start() 
    #p.join() 
    a.shape=3,3 
    # Print out the changed values 
    print 'first',a 
    time.sleep(3) 
    #h[0]=h[0]+1 
    print 'main',np.frombuffer(h.get_obj(), dtype=np.float32) 



if __name__=="__main__": 
    main()