Viene visualizzato un errore di sistema (mostrato di seguito) durante l'esecuzione di alcuni semplici calcoli di algebra di matrice basati su numpy in parallelo utilizzando il pacchetto Multiprocessing (python 2.73 con numpy 1.7.0 su Ubuntu 12.04 su Amazon EC2) . Il mio codice funziona bene per le dimensioni di matrice più piccoli, ma si blocca per quelli più grandi (con un sacco di memoria disponibile)Errore di sistema durante l'esecuzione di sottoprocessi mediante Multiprocessing
La dimensione delle matrici che uso è notevole (il mio codice viene eseguito bene per 1000000x10 galleggiante matrici denso ma si blocca per 1000000x500 quelli - I sto passando queste matrici a/da sottoprocessi a proposito). 10 vs 500 è un parametro run-time, tutto il resto rimane invariato (dati di input, altri parametri di run-time ecc.)
Ho anche provato a eseguire lo stesso codice (porting) utilizzando python3 - per matrici più grandi i sottoprocessi passano in modalità sleep/idle (invece di crashing come in python 2.7) e il programma/sottoprocessi si bloccano lì senza fare nulla. Per matrici più piccole il codice funziona bene con python3.
Tutti i suggerimenti sarebbero molto apprezzati (I sono a corto di idee qui)
messaggioerrore:
Exception in thread Thread-5: Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run() File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task) SystemError: NULL result without error in PyObject_Call
Il codice Multiprocessing io uso:
def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
if len(listOfInputs) == 0:
return
# Add result queue to the list of argument tuples.
resultQueue = mp.Manager().Queue()
listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
# Create and initialize the pool of workers.
pool = mp.Pool(processes = nParallelProcesses)
pool.map(proc, listOfInputsNew)
# Run the processes.
pool.close()
pool.join()
# Return the results.
return [resultQueue.get() for i in range(len(listOfInputs))]
riportano di seguito le " proc "che viene eseguito per ogni sottoprocesso. Fondamentalmente, risolve molti sistemi di equazioni lineari usando numpy (costruisce le matrici richieste all'interno del sottoprocesso) e restituisce i risultati come un'altra matrice. Ancora una volta, funziona bene per valori più piccoli di un parametro di runtime ma si blocca (o si blocca in python3) per quelli più grandi.
def solveForLFV(param):
startTime = time.time()
(chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
LFoutChunkSize = XY.shape[0]
nLFdim = LFVin.shape[1]
sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
for LFVoutIndex in xrange(LFoutChunkSize):
LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
sumLFVinOuterProductLFVpurch[:, :] = 0.
LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
queue.put((chunkI, LFVoutChunk))
print 'solveForLFV: ', time.time() - startTime, 'sec'
sys.stdout.flush()
Puoi condividere il codice per la funzione proc? – barracel
Proprio fatto. Non ho descritto gli argomenti del proc: alcuni di essi sono matrici, alcuni sono elenchi di elenchi e alcuni sono semplicemente float/interi. 'queue' è usato per restituire i risultati da ciascun sottoprocesso. – Yevgeny