Non sono un esperto di Python ma sono riuscito a scrivere un codice di multiprocessing che utilizza tutti i miei cpu e core nel mio PC. Il mio codice carica un array molto grande, circa 1,6 GB, e ho bisogno di aggiornare l'array in ogni processo. Fortunatamente, l'aggiornamento consiste nell'aggiungere alcune stelle artificiali all'immagine e ogni processo ha un diverso set di posizioni dell'immagine in cui aggiungere le stelle artificiali.Multiprocessing di Python e variabile condivisa
L'immagine è troppo grande e non riesco a crearne una nuova ogni volta che si chiama un processo. La mia soluzione era la creazione di una variabile nella memoria condivisa e risparmio molta memoria. Per qualche ragione, funziona per il 90% dell'immagine, ma ci sono regioni in cui il mio codice aggiunge numeri casuali in alcune delle posizioni che ho inviato prima ai processi. È legato al modo in cui creo una variabile condivisa? I processi interferiscono l'un l'altro durante l'esecuzione del mio codice?
Qualcosa di strano è che quando si usa una singola CPU e un singolo core, le immagini sono perfette al 100% e non ci sono numeri casuali aggiunti all'immagine. Mi suggerisci un modo per condividere una vasta gamma tra più processi? Qui la parte rilevante del mio codice. Per favore, leggi la riga quando definisco la variabile im_data.
import warnings
warnings.filterwarnings("ignore")
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing, Queue
import ctypes
class Worker(multiprocessing.Process):
def __init__(self, work_queue, result_queue):
# base class initialization
multiprocessing.Process.__init__(self)
# job management stuff
self.work_queue = work_queue
self.result_queue = result_queue
self.kill_received = False
def run(self):
while not self.kill_received:
# get a task
try:
i_range, psf_file = self.work_queue.get_nowait()
except Queue.Empty:
break
# the actual processing
print "Adding artificial stars - index range=", i_range
radius=16
x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2)
x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
distance = np.sqrt(x**2 + y**2)
for i in range(i_range[0],i_range[1]):
psf_xy=np.zeros(psf_size[1:3], dtype=float)
j=0
for i_order in range(psf_order+1):
j_order=0
while (i_order+j_order < psf_order+1):
psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
j_order+=1
j+=1
psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy)
psf_xy *= psf_factor
npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4)
npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
npsf_xy *= npsf_factor
im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])]
im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])]
npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10.
self.result_queue.put(id)
if __name__ == "__main__":
n_cpu=2
n_core=6
n_processes=n_cpu*n_core*1
input_mock_file=sys.argv[1]
print "Reading file ", im_file[i]
hdu=pyfits.open(im_file[i])
data=hdu[0].data
im_size=data.shape
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
data=0
assert im_data.base.base is im_data_base.get_obj()
# run
# load up work queue
tic=time.time()
j_step=np.int(np.ceil(mock_n*1./n_processes))
j_range=range(0,mock_n,j_step)
j_range.append(mock_n)
work_queue = multiprocessing.Queue()
for j in range(np.size(j_range)-1):
if work_queue.full():
print "Oh no! Queue is full after only %d iterations" % j
work_queue.put((j_range[j:j+2], psf_file[i]))
# create a queue to pass to workers to store the results
result_queue = multiprocessing.Queue()
# spawn workers
for j in range(n_processes):
worker = Worker(work_queue, result_queue)
worker.start()
# collect the results off the queue
while not work_queue.empty():
result_queue.get()
print "Writing file ", mock_im_file[i]
hdu[0].data=im_data
hdu.writeto(mock_im_file[i])
print "%f s for parallel computation." % (time.time() - tic)
Invece di condividere array di grandi dimensioni, non è possibile suddividerli in subarray più piccoli e inviare questi subarray ai sottoprocessi? E poi unire i risultati alla matrice originale. – freakish
E considera anche l'utilizzo di qualcosa di diverso da Python per l'elaborazione di immagini così grandi (C addon?). – freakish