2016-06-02 26 views
5

Update 1.0 inizio

Sembra quando la chiamataCome eseguire il ciclo parallelo in Python?

for i, Wi in enumerate(W.T): 
    idx.append(i) 
    result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 

gli argomenti passati alla funzione ALS_Y/ALS_X non sono riferimenti, ha copiato l'arguments..So, quando X o Y è molto large matrixes, come ad esempio, nel mio caso, è 6000*40 o così (ed è un for-loop, supponiamo che il numero di iterazioni sia 50 000, quindi ...), eccede il limite di memoria.
E poi ho provato con argomenti globali, solo di passaggio gli indici come parametri nelle funzioni,

import multiprocessing 
import time 
import numpy as np 

def func(idx): 
    global a 
    a[idx] += 1 



if __name__ == "__main__": 
    a=range(10) 
    for j in xrange(2): 
     pool = multiprocessing.Pool(processes=8) 
     result = [] 
     for i in xrange(10): 
      result.append(pool.apply_async(func, (i,))) 
     pool.close() 
     pool.join() 
     print a 
     print "Sub-process(es) done." 

essa stampa: `

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 
Sub-process(es) done. 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 
Sub-process(es) done. 

So, this means it still copied a`! Ora, mi chiedo se c'è un modo per gestire questo problema? Apprezzare!

Update 1.0 fine


Di seguito è riportato il mio codice in Python per risolvere il problema della matrice fattorizzazione. W = XY. Tuttavia, i codici seguenti non sono efficienti, e spero che possa essere convertito in versione parallela, utilizzando GPU è il migliore, anche la CPU va bene. Non ho esperienza di programmazione parallela, quindi qualcuno può darmi qualche consiglio?

Di seguito si riporta il codice per fattorizzare le matrici con ALS (alternanza dei minimi quadrati, dettagli here)

for ii in range(n_iterations): 
    for u, Wu in enumerate(W): 
     X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
           np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop 

    for i, Wi in enumerate(W.T): 
     Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop 
    error = get_error(Q, X, Y, W) 
    weighted_errors.append(error) 
    print '{}th iteration is completed'.format(ii) 

dopo usato multiprocessing lib, il mio codice ora:

def ALS_X(Y, Wu, Q, lambda_, n_factors, u): 
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
          np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T 

for ii in range(n_iterations): 
pool = multiprocessing.Pool(processes=12)#create pool 
result = []#store each row for X 
idx = []#store the row number 
for u, Wu in enumerate(W): 
    idx.append(u) 
    result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,))) 
pool.close() 
pool.join() 
for u, vector in zip(idx, result): 
    X[u] = vector.get()#assign the result to X 
###################################### 
pool = multiprocessing.Pool(processes=12)#for Y, much similar to X 
result = [] 
idx = [] 
for i, Wi in enumerate(W.T): 
    idx.append(i) 
    result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 
pool.close() 
pool.join() 
for i, vector in zip(idx, result): 
    Y[:,i] = vector.get() 
error = get_error(Q, X, Y, W) 
weighted_errors.append(error) 
print '{}th iteration is completed'.format(ii), 'error: ',error 

ma un po ' miseria, il programma si arrestava sempre in modo silenzioso ...

Di seguito è l'intero mazzo del mio codice .. è tutto in disordine. Basta ignorare load_dataget_error e vec2str, dal momento che qui ho generare la matrice in modo casuale ..

import pandas as pd 
import numpy as np 
import multiprocessing 

def vec2str(vec): 
    res = '' 
    for dim in len(vec): 
     res += str(vec[dim]) + ',' 
    return res 

def load_data(heads, filename, sep,header=None): 
    data = pd.read_table(filename, sep=sep, header=header, names=heads) 
    rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally... 
    Q = rp.fillna(0) 
    Q = Q.values 
    W = Q >0.5 
    W[W == True] = 1 
    W[W == False] = 0 
    W = W.astype(np.float64, copy=False) 
    return Q, W, rp 

def get_error(Q, X, Y, W): 
    return np.sum((W * (Q - np.dot(X, Y)))**2) 

''' 
X[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .T)) + * np.eye(), 
           np.dot(, np.dot(np.diag(), Q[u].T))).T 
''' 
def ALS_X(Y, Wu, Q, lambda_, n_factors, u): 
    return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
           np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T 

''' 
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i]))) 
''' 

def ALS_Y(X, Wi, Q, lambda_, n_factors, i): 
    return np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i]))) 



if __name__ == "__main__": 

    lambda_ = 0.1 
    n_factors = 40 
    filename = 'data_songID' 
    n_iterations = 20 
    #Q, W, rp = load_data(['uid', 'sid', 'rating'], filename, ',') 
    Q = np.random.rand(1000,1000) 
    m, n = Q.shape 
    W = np.eye(1000) 
    print 'Loading data finished, ', 'size: ', Q.shape 
    print 'Settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors) 
    X = 5 * np.random.rand(m, n_factors) 
    Y = 5 * np.random.rand(n_factors, n) 
    errors = [] 
    for ii in range(n_iterations): 
     X = np.linalg.solve(np.dot(Y, Y.T) + lambda_ * np.eye(n_factors), 
         np.dot(Y, Q.T)).T 
     Y = np.linalg.solve(np.dot(X.T, X) + lambda_ * np.eye(n_factors), 
         np.dot(X.T, Q)) 
     if ii % 100 == 0: 
      print('{}th iteration is completed'.format(ii)) 
     errors.append(get_error(Q, X, Y, W)) 
     Q_hat = np.dot(X, Y) 
     print('Error of rated movies: {}'.format(get_error(Q, X, Y, W))) 
    print errors 
    #####ALS start....##### 
    print '*'*100 
    weighted_errors = [] 
    for ii in range(n_iterations): 
     pool = multiprocessing.Pool(processes=12) 
     result = [] 
     idx = [] 
     for u, Wu in enumerate(W): 
      idx.append(u) 
      result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,))) 
     pool.close() 
     pool.join() 
     for u, vector in zip(idx, result): 
      X[u] = vector.get() 
     ###################################### 
     pool = multiprocessing.Pool(processes=12) 
     result = [] 
     idx = [] 
     for i, Wi in enumerate(W.T): 
      idx.append(i) 
      result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 
     pool.close() 
     pool.join() 
     for i, vector in zip(idx, result): 
      Y[:,i] = vector.get() 
     error = get_error(Q, X, Y, W) 
     weighted_errors.append(error) 
     print '{}th iteration is completed'.format(ii), 'error: ',error 

    weighted_Q_hat = np.dot(X,Y) 
    print weighted_errors 
    X.tofile('X.bin') 
    Y.tofile('Y.bin') 
    latent_user_file = open('user_latent','w') 
    for idx in len(rp.axes[0]): 
     latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(X[idx,:]) + '\n') 

    latent_mid_file = open('mid_latent', 'w') 
    for idx in len(rp.axes[1]): 
     latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(Y.T[idx,:]) + '\n') 
+0

perché non si creano 2 thread utilizzando il modulo multithreading e quindi li si unisce –

+1

@binayr: i thread in Python sono soggetti al Global Interpreter Lock, ma un'estensione C come NumPy può essere in grado di eseguire quantità significative di lavoro senza tenere il GIL, quindi è difficile dire quanto aiuto sarebbe. – Kevin

+0

Poiché il ciclo Y_inner deve essere eseguito dopo il ciclo X_inner. Ma ogni iterazione nel ciclo X_inner può essere eseguita simultaneamente. E spero che il ciclo X_inner possa essere eseguito parallelamente ... La mia macchina è una 12-CPU e 3-GPU ... Quindi, spero di poterli utilizzare appieno, ma non sono così chiaro su come farlo .. triste .. –

risposta

1

L'anno scorso ho incontrato il vostro desiderio di un "loop parallelo" in pitone, e fatto una come parte del mio lavoro per una fisica carta. Ci sono molti moduli che fanno quello che vuoi, ma ho scoperto che potevo davvero farlo funzionare solo con pp nel modo che volevo per le funzioni arbitrarie.

Se si desidera qualcosa che assomiglia a questo:

ResultList = Library_ParallelLoop.Main(
    Function = ExampleFunction, 
    ListOfArgSets = ListOfArgSets, 
    Algorithm = 'pp', 
    PrintExtra = True 
    ) 

Poi si punta al mio hub git invece di fornire tutta la mia fonte in questo post come l'implementazione di ottenere in realtà di lavoro è stato dolorosamente molte linee, e ha coinvolto la copia profonda delle funzioni Python che apparentemente è qualcos'altro che non è stato pre-costruito molto bene in Python.

Finding Primes Esempio:

https://github.com/douglasquincyadams/Main/blob/master/Test_ParallelLoop.py

Repo:

https://github.com/douglasquincyadams/Main

Se si scarica il mio repo in qualche angolo buio del computer - poi y il nostro frammento di lavoro dovrebbe essere:

import Library_ParallelLoop 

def do_the_thing_function(ii): 
    for u, Wu in enumerate(W): 
    X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
          np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop 

    for i, Wi in enumerate(W.T): 
    Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop 
          np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop 
    error = get_error(Q, X, Y, W) 
    weighted_errors.append(error) 
    print '{}th iteration is completed'.format(ii) 
    return #whatever your result is supposed to be... your code doesn't work on its own 

ListOfArgSets = [] 
for ii in range(n_iterations): 
    ListOfArgSets.append( { "ii" : ii , } ) 

ResultList = Library_ParallelLoop.Main(
    Function = do_the_thing_function, 
    ListOfArgSets = ListOfArgSets, 
    Algorithm = 'pp', 
    PrintExtra = True 
    ) 

Se mi ha chiesto - un ciclo parallelo molto simile a quello di cui sopra dovrebbe già essere qualcosa che è bello a portata di mano e incorporati in lingue, ma sembra sempre di essere in qualche modo criptico exampled da maghi in una torre e non funziona abbastanza quando lo provi sul tuo schifoso laptop. Comunque, spero che questo aiuti.

Nota aggiuntiva - Vorrei anche suggerire che, se si vuole risolvere un problema di parallelizzazione su larga scala arbitraria (qualcosa di più di semplici loop), che si utilizza MPI perché ha tutti i tipi di campane e fischietti che possono permettere processi per parlare l'un l'altro a metà percorso. L'MPI è ciò che la gente ama utilizzare per le più grandi simulazioni, quindi i cluster di dimensioni maggiori progettati per gestire lavori di grandi dimensioni (~ 10k + core) supportano tutti MPI e sono certamente improbabili che supportino il pp o il modulo multiprocessing. Se vuoi semplicemente utilizzare tutti i core del tuo PC (o alcuni PC su una rete), scegli quello più semplice per lavorare.

Problemi correlati