Update 1.0 inizio
Sembra quando la chiamataCome eseguire il ciclo parallelo in Python?
for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
gli argomenti passati alla funzione ALS_Y/ALS_X
non sono riferimenti, ha copiato l'arguments..So, quando X
o Y
è molto large matrixes
, come ad esempio, nel mio caso, è 6000*40
o così (ed è un for-loop
, supponiamo che il numero di iterazioni sia 50 000
, quindi ...), eccede il limite di memoria.
E poi ho provato con argomenti globali, solo di passaggio gli indici come parametri nelle funzioni,
import multiprocessing
import time
import numpy as np
def func(idx):
global a
a[idx] += 1
if __name__ == "__main__":
a=range(10)
for j in xrange(2):
pool = multiprocessing.Pool(processes=8)
result = []
for i in xrange(10):
result.append(pool.apply_async(func, (i,)))
pool.close()
pool.join()
print a
print "Sub-process(es) done."
essa stampa: `
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.
So, this means it still copied
a`! Ora, mi chiedo se c'è un modo per gestire questo problema? Apprezzare!
Update 1.0 fine
Di seguito è riportato il mio codice in Python per risolvere il problema della matrice fattorizzazione. W = XY. Tuttavia, i codici seguenti non sono efficienti, e spero che possa essere convertito in versione parallela, utilizzando GPU è il migliore, anche la CPU va bene. Non ho esperienza di programmazione parallela, quindi qualcuno può darmi qualche consiglio?
Di seguito si riporta il codice per fattorizzare le matrici con ALS (alternanza dei minimi quadrati, dettagli here)
for ii in range(n_iterations):
for u, Wu in enumerate(W):
X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop
for i, Wi in enumerate(W.T):
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii)
dopo usato multiprocessing lib, il mio codice ora:
def ALS_X(Y, Wu, Q, lambda_, n_factors, u):
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T
for ii in range(n_iterations):
pool = multiprocessing.Pool(processes=12)#create pool
result = []#store each row for X
idx = []#store the row number
for u, Wu in enumerate(W):
idx.append(u)
result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,)))
pool.close()
pool.join()
for u, vector in zip(idx, result):
X[u] = vector.get()#assign the result to X
######################################
pool = multiprocessing.Pool(processes=12)#for Y, much similar to X
result = []
idx = []
for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
pool.close()
pool.join()
for i, vector in zip(idx, result):
Y[:,i] = vector.get()
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii), 'error: ',error
ma un po ' miseria, il programma si arrestava sempre in modo silenzioso ...
Di seguito è l'intero mazzo del mio codice .. è tutto in disordine. Basta ignorare load_data
get_error
e vec2str
, dal momento che qui ho generare la matrice in modo casuale ..
import pandas as pd
import numpy as np
import multiprocessing
def vec2str(vec):
res = ''
for dim in len(vec):
res += str(vec[dim]) + ','
return res
def load_data(heads, filename, sep,header=None):
data = pd.read_table(filename, sep=sep, header=header, names=heads)
rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally...
Q = rp.fillna(0)
Q = Q.values
W = Q >0.5
W[W == True] = 1
W[W == False] = 0
W = W.astype(np.float64, copy=False)
return Q, W, rp
def get_error(Q, X, Y, W):
return np.sum((W * (Q - np.dot(X, Y)))**2)
'''
X[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .T)) + * np.eye(),
np.dot(, np.dot(np.diag(), Q[u].T))).T
'''
def ALS_X(Y, Wu, Q, lambda_, n_factors, u):
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T
'''
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors),
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))
'''
def ALS_Y(X, Wi, Q, lambda_, n_factors, i):
return np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors),
np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))
if __name__ == "__main__":
lambda_ = 0.1
n_factors = 40
filename = 'data_songID'
n_iterations = 20
#Q, W, rp = load_data(['uid', 'sid', 'rating'], filename, ',')
Q = np.random.rand(1000,1000)
m, n = Q.shape
W = np.eye(1000)
print 'Loading data finished, ', 'size: ', Q.shape
print 'Settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors)
X = 5 * np.random.rand(m, n_factors)
Y = 5 * np.random.rand(n_factors, n)
errors = []
for ii in range(n_iterations):
X = np.linalg.solve(np.dot(Y, Y.T) + lambda_ * np.eye(n_factors),
np.dot(Y, Q.T)).T
Y = np.linalg.solve(np.dot(X.T, X) + lambda_ * np.eye(n_factors),
np.dot(X.T, Q))
if ii % 100 == 0:
print('{}th iteration is completed'.format(ii))
errors.append(get_error(Q, X, Y, W))
Q_hat = np.dot(X, Y)
print('Error of rated movies: {}'.format(get_error(Q, X, Y, W)))
print errors
#####ALS start....#####
print '*'*100
weighted_errors = []
for ii in range(n_iterations):
pool = multiprocessing.Pool(processes=12)
result = []
idx = []
for u, Wu in enumerate(W):
idx.append(u)
result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,)))
pool.close()
pool.join()
for u, vector in zip(idx, result):
X[u] = vector.get()
######################################
pool = multiprocessing.Pool(processes=12)
result = []
idx = []
for i, Wi in enumerate(W.T):
idx.append(i)
result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
pool.close()
pool.join()
for i, vector in zip(idx, result):
Y[:,i] = vector.get()
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii), 'error: ',error
weighted_Q_hat = np.dot(X,Y)
print weighted_errors
X.tofile('X.bin')
Y.tofile('Y.bin')
latent_user_file = open('user_latent','w')
for idx in len(rp.axes[0]):
latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(X[idx,:]) + '\n')
latent_mid_file = open('mid_latent', 'w')
for idx in len(rp.axes[1]):
latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(Y.T[idx,:]) + '\n')
perché non si creano 2 thread utilizzando il modulo multithreading e quindi li si unisce –
@binayr: i thread in Python sono soggetti al Global Interpreter Lock, ma un'estensione C come NumPy può essere in grado di eseguire quantità significative di lavoro senza tenere il GIL, quindi è difficile dire quanto aiuto sarebbe. – Kevin
Poiché il ciclo Y_inner deve essere eseguito dopo il ciclo X_inner. Ma ogni iterazione nel ciclo X_inner può essere eseguita simultaneamente. E spero che il ciclo X_inner possa essere eseguito parallelamente ... La mia macchina è una 12-CPU e 3-GPU ... Quindi, spero di poterli utilizzare appieno, ma non sono così chiaro su come farlo .. triste .. –