2013-04-09 26 views
5

Non sono un esperto di Python ma sono riuscito a scrivere un codice di multiprocessing che utilizza tutti i miei cpu e core nel mio PC. Il mio codice carica un array molto grande, circa 1,6 GB, e ho bisogno di aggiornare l'array in ogni processo. Fortunatamente, l'aggiornamento consiste nell'aggiungere alcune stelle artificiali all'immagine e ogni processo ha un diverso set di posizioni dell'immagine in cui aggiungere le stelle artificiali.Multiprocessing di Python e variabile condivisa

L'immagine è troppo grande e non riesco a crearne una nuova ogni volta che si chiama un processo. La mia soluzione era la creazione di una variabile nella memoria condivisa e risparmio molta memoria. Per qualche ragione, funziona per il 90% dell'immagine, ma ci sono regioni in cui il mio codice aggiunge numeri casuali in alcune delle posizioni che ho inviato prima ai processi. È legato al modo in cui creo una variabile condivisa? I processi interferiscono l'un l'altro durante l'esecuzione del mio codice?

Qualcosa di strano è che quando si usa una singola CPU e un singolo core, le immagini sono perfette al 100% e non ci sono numeri casuali aggiunti all'immagine. Mi suggerisci un modo per condividere una vasta gamma tra più processi? Qui la parte rilevante del mio codice. Per favore, leggi la riga quando definisco la variabile im_data.

import warnings 
warnings.filterwarnings("ignore") 

from mpl_toolkits.mplot3d import Axes3D 
from matplotlib import cm 
import matplotlib.pyplot as plt 
import sys,os 
import subprocess 
import numpy as np 
import time 
import cv2 as cv 
import pyfits 
from pyfits import getheader 
import multiprocessing, Queue 
import ctypes 

class Worker(multiprocessing.Process): 


def __init__(self, work_queue, result_queue): 

    # base class initialization 
    multiprocessing.Process.__init__(self) 

    # job management stuff 
    self.work_queue = work_queue 
    self.result_queue = result_queue 
    self.kill_received = False 

def run(self): 
    while not self.kill_received: 

     # get a task 
     try: 
      i_range, psf_file = self.work_queue.get_nowait() 
     except Queue.Empty: 
      break 

     # the actual processing 
     print "Adding artificial stars - index range=", i_range 

     radius=16 
     x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2) 
     x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c) 
     distance = np.sqrt(x**2 + y**2) 

     for i in range(i_range[0],i_range[1]): 
      psf_xy=np.zeros(psf_size[1:3], dtype=float) 
      j=0 
      for i_order in range(psf_order+1): 
       j_order=0 
       while (i_order+j_order < psf_order+1): 
        psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order 
        j_order+=1 
        j+=1 


      psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy) 
      psf_xy *= psf_factor 

      npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4) 
      npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy) 
      npsf_xy *= npsf_factor 

      im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])] 
      im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])] 
      npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])] 
      npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])] 

      im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10. 


     self.result_queue.put(id) 

if __name__ == "__main__": 

    n_cpu=2 
    n_core=6 
    n_processes=n_cpu*n_core*1 
    input_mock_file=sys.argv[1] 

    print "Reading file ", im_file[i] 
    hdu=pyfits.open(im_file[i]) 
    data=hdu[0].data 
    im_size=data.shape 

    im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
    im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
    im_data = im_data.reshape(im_size[0], im_size[1]) 
    im_data[:] = data 
    data=0 
    assert im_data.base.base is im_data_base.get_obj() 

    # run 
    # load up work queue 
    tic=time.time() 
    j_step=np.int(np.ceil(mock_n*1./n_processes)) 
    j_range=range(0,mock_n,j_step) 
    j_range.append(mock_n) 


    work_queue = multiprocessing.Queue() 
    for j in range(np.size(j_range)-1): 
    if work_queue.full(): 
     print "Oh no! Queue is full after only %d iterations" % j 
    work_queue.put((j_range[j:j+2], psf_file[i])) 

    # create a queue to pass to workers to store the results 
    result_queue = multiprocessing.Queue() 

    # spawn workers 
    for j in range(n_processes): 
    worker = Worker(work_queue, result_queue) 
    worker.start() 

    # collect the results off the queue 
    while not work_queue.empty(): 
    result_queue.get() 

    print "Writing file ", mock_im_file[i] 
    hdu[0].data=im_data 
    hdu.writeto(mock_im_file[i]) 
    print "%f s for parallel computation." % (time.time() - tic) 
+1

Invece di condividere array di grandi dimensioni, non è possibile suddividerli in subarray più piccoli e inviare questi subarray ai sottoprocessi? E poi unire i risultati alla matrice originale. – freakish

+0

E considera anche l'utilizzo di qualcosa di diverso da Python per l'elaborazione di immagini così grandi (C addon?). – freakish

risposta

3

Credo che il problema (come lei ha suggerito che nella sua interrogazione) deriva dal fatto che siete scrittura nello stesso array da più thread.

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 

Anche se sono abbastanza sicuro che si potrebbe scrivere in im_data_base in maniera "processo-safe" (un blocco implicita viene utilizzato da python per sincronizzare l'accesso alla matrice), io non sono sicuro che si può scrivere in im_data in modo sicuro dal punto di vista del processo.

Vorrei quindi (anche se io non sono sicuro che risolvere il problema) consiglia di creare un blocco esplicito intorno im_data

# Disable python implicit lock, we are going to use our own 
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1], 
    lock=False) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 
# Create our own lock 
im_data_lock = Lock() 

Poi nei processi, acquisire il blocco ogni volta che è necessario modificare im_data

self.im_data_lock.acquire() 
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10 
self.im_data_lock.release() 

ho omesso il codice per passare la serratura alla contructor del processo e memorizzarlo come campo membro (self.im_data_lock) per motivi di brevità. È inoltre necessario passare l'array im_data al costruttore del processo e memorizzarlo come campo membro.

1

Il problema si verifica nell'esempio in cui più thread scrivono in regioni sovrapposte nell'immagine/matrice. Quindi, in effetti, devi mettere un blocco per immagine o creare un set di blocchi per sezioni di immagini (per ridurre il conflitto di blocco).

Oppure è possibile produrre modifiche dell'immagine in un set di processi e apportare la modifica effettiva dell'immagine in un singolo thread separato.

Problemi correlati