8

Ho affrontato questo problema per una settimana e sta diventando piuttosto frustrante perché ogni volta che implemento un esempio più semplice ma simile di quello che devo fare, si scopre che il multiprocessing lo fonderà. Il modo in cui gestisce la memoria condivisa mi sconcerta perché è così limitato che può diventare inutile abbastanza rapidamente.Un modo migliore per condividere la memoria per la multiprocessing in Python?

Quindi la descrizione di base del mio problema è che ho bisogno di creare un processo che viene passato in alcuni parametri per aprire un'immagine e creare circa 20K patch di dimensioni 60x40. Queste patch vengono salvate in un elenco 2 alla volta e devono essere restituite al thread principale per essere quindi elaborate di nuovo da altri due processi simultanei eseguiti sulla GPU.

Il processo, il flusso di lavoro e tutto ciò di cui ci si preoccupa maggiormente, ciò di cui ho bisogno ora è che la parte che doveva essere la più semplice si sta rivelando la più difficile. Non sono stato in grado di salvare e ottenere la lista con patch 20K sul thread principale.

Il primo problema era perché stavo salvando queste patch come immagini PIL. Ho poi scoperto che tutti i dati aggiunti a un oggetto Queue devono essere decapitati. Secondo problema, ho quindi convertito le patch in un array di 60x40 ciascuna e le ho salvate in un elenco. E ora che ancora non funziona? Apparentemente le code hanno una quantità limitata di dati che possono essere salvati diversamente quando chiamate queue_obj.get() il programma si blocca.

Ho provato molte altre cose e ogni nuova cosa che provo non funziona, quindi mi piacerebbe sapere se qualcuno ha altre raccomandazioni di una libreria che posso usare per condividere oggetti senza tutto il fuzz?

Ecco un esempio di implementazione di tipo di ciò che sto guardando. Tieni presente che funziona perfettamente, ma la piena implementazione no. E ho il codice di stampare i messaggi informativi per vedere che i dati salvati hanno la stessa forma e tutto, ma per qualche motivo non funziona. Nell'implementazione completa il processo indipendente viene completato correttamente ma si blocca su q.get().

from PIL import Image 
from multiprocessing import Queue, Process 
import StringIO 
import numpy 

img = Image.open("/path/to/image.jpg") 
q = Queue() 
q2 = Queue() 
# 
# 
# MAX Individual Queue limit for 60x40 images in BW is 31,466. 
# Multiple individual Queues can be filled to the max limit of 31,466. 
# A single Queue can only take up to 31,466, even if split up in different puts. 
def rz(patch, qn1, qn2): 
    totalPatchCount = 20000 
    channels = 1 
    patch = patch.resize((60,40), Image.ANTIALIAS) 
    patch = patch.convert('L') 
    # ImgArray = numpy.asarray(im, dtype=numpy.float32) 
    list_im_arr = [] 
    # ----Create a 4D Array 
    # returnImageArray = numpy.zeros(shape=(totalPatchCount, channels, 40, 60)) 
    imgArray = numpy.asarray(patch, dtype=numpy.float32) 
    imgArray = imgArray[numpy.newaxis, ...] 
    # ----End 4D array 
    # list_im_arr2 = [] 
    for i in xrange(totalPatchCount): 
     # returnImageArray[i] = imgArray 
     list_im_arr.append(imgArray) 
    qn1.put(list_im_arr) 
    qn1.cancel_join_thread() 
    # qn2.cancel_join_thread() 
    print "PROGRAM Done" 

# rz(img,q,q2) 
# l = q.get() 

# 
p = Process(target=rz,args=(img, q, q2,)) 
p.start() 
p.join() 
# 
# # l = [] 
# # for i in xrange(1000): l.append(q.get()) 
# 
imdata = q.get() 
+0

Intendi interrompere un'immagine in molte patch (tile) e salvarle come elenco di array e che desideri utilizzare più thread per accelerare questo processo? – user3667217

+0

Ogni immagine è suddivisa in molte patch e salvata come un singolo elenco di 20K patch per immagine. Questa parte è tutto fatto in un unico processo, non ho bisogno di dividere i dati, ho solo bisogno di ottenere quella lista specifica creata sul thread principale. Quindi i processi multipli creerebbero più elenchi di patch da 20K ciascuno e li rimandano al programma principale per elaborare ora 2 di questi elenchi alla volta sulle mie 2 GPU. – alfredox

risposta

6

La coda è per la comunicazione tra i processi. Nel tuo caso, non hai davvero questo tipo di comunicazione. Puoi semplicemente lasciare che il processo restituisca il risultato e utilizzare il metodo .get() per raccoglierli. (Ricordarsi di aggiungere if __name__ == "main":, vedere programming guideline)

from PIL import Image 
from multiprocessing import Pool, Lock 
import numpy 

img = Image.open("/path/to/image.jpg") 

def rz(): 
    totalPatchCount = 20000 
    imgArray = numpy.asarray(patch, dtype=numpy.float32) 
    list_im_arr = [imgArray] * totalPatchCount # A more elegant way than a for loop 
    return list_im_arr 

if __name__ == '__main__': 
    # patch = img.... Your code to get generate patch here 
    patch = patch.resize((60,40), Image.ANTIALIAS) 
    patch = patch.convert('L') 

    pool = Pool(2) 
    imdata = [pool.apply_async(rz).get() for x in range(2)] 
    pool.close() 
    pool.join() 

Ora, secondo prima risposta di questo post, multiprocessing passare solo gli oggetti che è in serializzabili. Il decapaggio è probabilmente inevitabile in multiprocessing perché i processi non condividono la memoria. Semplicemente non vivono nello stesso universo. (Hanno ereditato memoria quando vengono generati per la prima volta, ma non possono raggiungere il loro universo). L'oggetto immagine PIL stesso non è selezionabile. Puoi renderlo selezionabile estraendo solo i dati dell'immagine memorizzati in esso, come suggerito da questo post.

Poiché il problema è principalmente legato all'I/O, è possibile provare anche il multi-threading. Potrebbe essere ancora più veloce per il tuo scopo. I thread condividono tutto così non è richiesto il decapaggio. Se stai usando python 3, lo ThreadPoolExecutor è uno strumento meraviglioso. Per Python 2, puoi usare ThreadPool. Per ottenere una maggiore efficienza, dovrai riorganizzare il modo in cui fai le cose, vuoi rompere il processo e lasciare che diversi processi facciano il loro lavoro.

from PIL import Image 
from multiprocessing.pool import ThreadPool 
from multiprocessing import Lock 
import numpy 

img = Image.open("/path/to/image.jpg") 
lock = Lock(): 
totalPatchCount = 20000 

def rz(x): 
    patch = ... 
    return patch 

pool = ThreadPool(8) 
imdata = [pool.map(rz, range(totalPatchCount)) for i in range(2)] 
pool.close() 
pool.join() 
+0

stai passando solo un argomento in pool.apply_asyn (rz, args = (x,)), ma rz ne prende due, è corretto? Inoltre ho provato questo e ho avuto un errore che i dati non possono essere decapitati. Quindi, anche se ottieni dati tramite un metodo pool.get(), deve ancora essere sottoposto al decapaggio? – alfredox

+0

No, non è corretto. Ho fatto alcuni errori di fretta. Ho aggiornato il mio codice. Niente dovrebbe essere messo in salamoia nel mio codice. Puoi pubblicare la riga che ti dà effettivamente l'errore? – user3667217

+0

Questo è quello che sto ricevendo: '' AttributeError Traceback (chiamata più recente scorso) in () 18 imdata = [] ---> 20 con piscina (processi = 2) come piscina: 21 per x nella gamma (2): 22 res = pool.apply_asyn (rz, args = (patch, x)) AttributeError: __exit__' – alfredox

1

Tu dici "A quanto pare code sono una quantità limitata di dati che possono salvare altrimenti quando si chiama queue_obj.get() il programma si blocca.. ".

Hai ragione e sbagliato non v'è una quantità limitata di informazioni del Queue terrà senza essere drenato Il problema è che quando si fa:

qn1.put(list_im_arr) 
qn1.cancel_join_thread() 

pianifica la comunicazione al sottostante pipe (gestito da un thread), quindi "but it's cool if we exit without the scheduled put completing" e, successivamente, pochi microsecondi dopo, la funzione worker viene chiusa e le uscite Process (senza attendere che il thread che sta compilando il pipe lo faccia effettivamente; potrebbe aver inviato i byte iniziali dell'oggetto, ma quasi tutto ciò che non rientra in PIPE_BUF caduto; avresti bisogno di sorprendenti condizioni di gara per ottenere qualcosa, per non parlare di tutto un grande oggetto). Così più tardi, quando si esegue:

imdata = q.get() 

nulla è stato effettivamente inviato dal (ormai uscito) Process. Quando chiami q.get() è in attesa di dati che non sono mai stati effettivamente trasmessi.

L'altra risposta è corretta che nel caso di calcolo e trasporto di un singolo valore, Queue s sono eccessivo. Ma se hai intenzione di usarli, devi usarli correttamente. La correzione potrebbe essere quella di:

  1. Rimuovere la chiamata alla qn1.cancel_join_thread() in modo che il Process non uscire fino a quando i dati sono stati trasmessi attraverso il tubo.
  2. Riorganizzare le chiamate per evitare stallo

Riorganizzare è proprio questo:

p = Process(target=rz,args=(img, q, q2,)) 
p.start() 

imdata = q.get() 
p.join() 

movimento p.join() dopo q.get(); se provi prima a join, il tuo processo principale aspetterà che il bambino esca, e il bambino aspetterà che la coda venga consumata prima che esca (potrebbe funzionare se la pipa di Queue viene svuotata da un thread nel processo principale, ma è meglio non contare su dettagli di implementazione come questo, questo modulo è corretto a prescindere dai dettagli di implementazione, purché corrispondano a put se get s).

+0

Sì, l'ho provato sia con che senza qn1.cancel_join_thread(), ma chiamerei sempre p.join() per attendere il completamento del processo prima di uscire. Grazie per la fantastica spiegazione, sto ancora cercando di far funzionare il codice suggerito. Ti farò sapere come va. – alfredox

Problemi correlati