Nota che si può iniziare con una serie di DTYPE complesso:
In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')
e vederlo come un array di DTYPE omogenea:
In [5]: data2 = data.view('float32')
e più tardi, riconvertirlo in DTYPE complesso:
In [7]: data3 = data2.view('float32, (250000,2)float32')
La modifica del dtype è un'operazione molto rapida; non influenza i dati sottostanti, solo il modo in cui NumPy lo interpreta. Quindi cambiare il dtype è praticamente senza costi.
Quindi quello che hai letto sugli array con dtypes semplici (omogenei) può essere prontamente applicato al tuo complesso dtype con il trucco di cui sopra.
Il codice seguente prende in prestito molte idee da J.F. Sebastian's answer, here.
import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64
def decode(arg):
chunk, counter = arg
print len(chunk), counter
for x in chunk:
peak_counter = 0
data_buff = base64.b64decode(x)
buff_size = len(data_buff)/4
unpack_format = ">%dL" % buff_size
index = 0
for y in struct.unpack(unpack_format, data_buff):
buff1 = struct.pack("I", y)
buff2 = struct.unpack("f", buff1)[0]
with shared_arr.get_lock():
data = tonumpyarray(shared_arr).view(
[('f0', '<f4'), ('f1', '<f4', (250000, 2))])
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
peak_counter += 1
index += 1
counter += 1
def pool_init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.
The m/z values are assumed to be ordered without validating this
assumption.
Note: This function uses multi-processing
"""
processors = mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr,))) as pool:
chunk_size = int(len(peaks)/processors)
map_parameters = []
for i in range(processors):
counter = i * chunk_size
# WARNING: I removed -1 from (i + 1)*chunk_size, since the right
# index is non-inclusive.
chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode, map_parameters)
if __name__ == '__main__':
shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
peaks = ...
numpy_array(shared_arr, peaks)
Se è possibile garantire che i vari processi che eseguono le assegnazioni
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
mai competere per alterare i dati nelle stesse posizioni, allora credo che si può effettivamente rinunciare utilizzando il blocco della
with shared_arr.get_lock():
ma io non digito il codice abbastanza bene da sapere per certo, quindi per essere al sicuro, ho incluso il blocco.
Che cos'è 'mp'? Il modulo 'multiprocessing'? – delnan
sì, lo aggiungerò alla domanda –