2013-02-27 12 views
8

Viene visualizzato un errore di sistema (mostrato di seguito) durante l'esecuzione di alcuni semplici calcoli di algebra di matrice basati su numpy in parallelo utilizzando il pacchetto Multiprocessing (python 2.73 con numpy 1.7.0 su Ubuntu 12.04 su Amazon EC2) . Il mio codice funziona bene per le dimensioni di matrice più piccoli, ma si blocca per quelli più grandi (con un sacco di memoria disponibile)Errore di sistema durante l'esecuzione di sottoprocessi mediante Multiprocessing

La dimensione delle matrici che uso è notevole (il mio codice viene eseguito bene per 1000000x10 galleggiante matrici denso ma si blocca per 1000000x500 quelli - I sto passando queste matrici a/da sottoprocessi a proposito). 10 vs 500 è un parametro run-time, tutto il resto rimane invariato (dati di input, altri parametri di run-time ecc.)

Ho anche provato a eseguire lo stesso codice (porting) utilizzando python3 - per matrici più grandi i sottoprocessi passano in modalità sleep/idle (invece di crashing come in python 2.7) e il programma/sottoprocessi si bloccano lì senza fare nulla. Per matrici più piccole il codice funziona bene con python3.

Tutti i suggerimenti sarebbero molto apprezzati (I sono a corto di idee qui)

messaggio

errore:

Exception in thread Thread-5: Traceback (most recent call last): 
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner 
    self.run() File "/usr/lib/python2.7/threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks 
    put(task) SystemError: NULL result without error in PyObject_Call 

Il codice Multiprocessing io uso:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses): 
    if len(listOfInputs) == 0: 
     return 
    # Add result queue to the list of argument tuples. 
    resultQueue = mp.Manager().Queue() 
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs] 
    # Create and initialize the pool of workers. 
    pool = mp.Pool(processes = nParallelProcesses) 
    pool.map(proc, listOfInputsNew) 
    # Run the processes. 
    pool.close() 
    pool.join() 
    # Return the results. 
    return [resultQueue.get() for i in range(len(listOfInputs))] 

riportano di seguito le " proc "che viene eseguito per ogni sottoprocesso. Fondamentalmente, risolve molti sistemi di equazioni lineari usando numpy (costruisce le matrici richieste all'interno del sottoprocesso) e restituisce i risultati come un'altra matrice. Ancora una volta, funziona bene per valori più piccoli di un parametro di runtime ma si blocca (o si blocca in python3) per quelli più grandi.

def solveForLFV(param): 
    startTime = time.time() 
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param 
    LFoutChunkSize = XY.shape[0] 
    nLFdim = LFVin.shape[1] 
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim)) 
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim)) 
    for LFVoutIndex in xrange(LFoutChunkSize): 
     LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex] 
     sumLFVinOuterProductLFVpurch[:, :] = 0. 
     LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize) 
     for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)): 
      LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :] 
      sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :]) 
     LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :]) 
    queue.put((chunkI, LFVoutChunk)) 
    print 'solveForLFV: ', time.time() - startTime, 'sec' 
    sys.stdout.flush() 
+0

Puoi condividere il codice per la funzione proc? – barracel

+0

Proprio fatto. Non ho descritto gli argomenti del proc: alcuni di essi sono matrici, alcuni sono elenchi di elenchi e alcuni sono semplicemente float/interi. 'queue' è usato per restituire i risultati da ciascun sottoprocesso. – Yevgeny

risposta

5

500.000.000 è abbastanza grande: se si sta utilizzando float64, che è di 4 miliardi di byte o circa 4 GB. (L'array float da 10.000.000 sarebbe 80 milioni di byte, o circa 80 MB - molto più piccolo.) Mi aspetto che il problema abbia qualcosa a che fare con il multiprocessing cercando di mettere sottoschi gli array da inviare ai subprocessi su un pipe.

Dato che ci si trova su una piattaforma Unix, è possibile evitare questo comportamento sfruttando il comportamento di ereditarietà della memoria di fork() (utilizzato per creare gli operatori di multiprocessing). Ho avuto un grande successo con questo hack (strappato su this project), descritto dai commenti.

### A helper for letting the forked processes use data without pickling. 
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10)) 
    for _ in itertools.count()) 
class ForkedData(object): 
    ''' 
    Class used to pass data to child processes in multiprocessing without 
    really pickling/unpickling it. Only works on POSIX. 

    Intended use: 
     - The master process makes the data somehow, and does e.g. 
      data = ForkedData(the_value) 
     - The master makes sure to keep a reference to the ForkedData object 
      until the children are all done with it, since the global reference 
      is deleted to avoid memory leaks when the ForkedData object dies. 
     - Master process constructs a multiprocessing.Pool *after* 
      the ForkedData construction, so that the forked processes 
      inherit the new global. 
     - Master calls e.g. pool.map with data as an argument. 
     - Child gets the real value through data.value, and uses it read-only. 
    ''' 
    # TODO: does data really need to be used read-only? don't think so... 
    # TODO: more flexible garbage collection options 
    def __init__(self, val): 
        g = globals() 
        self.name = next(n for n in _data_name_cands if n not in g) 
        g[self.name] = val 
        self.master_pid = os.getpid() 

    @property 
    def value(self): 
        return globals()[self.name] 

    def __del__(self): 
        if os.getpid() == self.master_pid: 
            del globals()[self.name] 
Problemi correlati