2012-09-18 13 views
16

Ho uno script che include l'apertura di un file da un elenco e quindi fare qualcosa al testo all'interno di quel file. Sto usando Python multiprocessing e Pool per provare a parallelizzare questa operazione. Un'astrazione dello script è qui sotto:multiprocessing python apply_async utilizza solo un processo

import os 
from multiprocessing import Pool 

results = [] 
def testFunc(files): 
    for file in files: 
     print "Working in Process #%d" % (os.getpid()) 
     #This is just an illustration of some logic. This is not what I'm actually doing. 
     for line in file: 
      if 'dog' in line: 
       results.append(line) 

if __name__=="__main__": 
    p = Pool(processes=2) 
    files = ['/path/to/file1.txt', '/path/to/file2.txt'] 
    results = p.apply_async(testFunc, args = (files,)) 
    results2 = results.get() 

Quando eseguo questo la stampa della id processo è lo stesso per ogni iterazione. Fondamentalmente quello che sto cercando di fare è prendere ogni elemento della lista di input e distribuirlo in un processo separato, ma sembra che un processo stia facendo tutto il lavoro.

risposta

28
  • apply_async fa uscire un'azienda dalla piscina. Avresti bisogno di chiamare apply_async molte volte per esercitare più processori.
  • Non consentire a entrambi i processi di provare a scrivere sullo stesso elenco, results. Poiché i pool worker sono processi separati, i due non scriveranno nella stessa lista. Un modo per aggirare questo problema consiste nell'utilizzare una coda di output. Puoi configurarlo da solo o utilizzare la funzione di richiamata di apply_async per impostare la Coda. apply_async chiamerà il callback una volta completata la funzione.
  • È possibile utilizzare map_async anziché apply_async, ma in tal caso si dovrebbe ottenere per ottenere un elenco di elenchi, che sarà necessario appiattire.

Così, forse provare invece qualcosa di simile:

import os 
import multiprocessing as mp 

results = [] 

def testFunc(file): 
    result = [] 
    print "Working in Process #%d" % (os.getpid()) 
    # This is just an illustration of some logic. This is not what I'm 
    # actually doing. 
    with open(file, 'r') as f: 
     for line in f: 
      if 'dog' in line: 
       result.append(line) 
    return result 


def collect_results(result): 
    results.extend(result) 

if __name__ == "__main__": 
    p = mp.Pool(processes=2) 
    files = ['/path/to/file1.txt', '/path/to/file2.txt'] 
    for f in files: 
     p.apply_async(testFunc, args=(f,), callback=collect_results) 
    p.close() 
    p.join() 
    print(results) 
7

Forse in questo caso si dovrebbe usare map_async:

import os 
from multiprocessing import Pool 

results = [] 
def testFunc(file): 
    message = ("Working in Process #%d" % (os.getpid())) 
    #This is just an illustration of some logic. This is not what I'm actually doing. 
    for line in file: 
     if 'dog' in line: 
      results.append(line) 
    return message 

if __name__=="__main__": 
    print("saddsf") 
    p = Pool(processes=2) 
    files = ['/path/to/file1.txt', '/path/to/file2.txt'] 
    results = p.map_async(testFunc, files) 
    print(results.get()) 
+1

O forse solo 'map' se avete intenzione di' risultati .get() 'subito. – mgilson

+0

Apprezzo la risposta, ma sto cercando di restare con apply_async per vari motivi. – user1074057

Problemi correlati