2014-09-29 8 views
11

Ho cercato qui su come fare il threading in python, ma di gran lunga non sono stato in grado di ottenere la risposta di cui ho bisogno. Non ho molta familiarità con le classi python di Queue e Threading e per questo motivo alcuni degli anwser presenti qui non hanno alcun senso per me.Come ottenere i risultati da un pool di thread in python?

Voglio creare un pool di thread che posso dare un compito diverso e quando tutti hanno finito ottengono i valori dei risultati e li elaborano. Finora ho provato a farlo, ma non sono in grado di ottenere i risultati. Il codice che ho scritto è:

from threading import Thread 
from Queue import Queue 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.result = None 
     self.start() 
    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: 
       self.result = func(*args, **kargs) 
      except Exception, e: 
       print e 
      self.tasks.task_done() 
    def get_result(self): 
     return self.result 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     self.results = [] 
     for _ in range(num_threads): 
      w = Worker(self.tasks) 
      self.results.append(w.get_result()) 
    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 
    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 
    def get_results(self): 
     return self.results 

def foo(word, number): 
    print word*number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
for i in range(0, len(words)): 
    pool.add_task(foo, words[i], numbers[i]) 

pool.wait_completion() 
results = pool.get_results() 
print results 

Le stampe in uscita le corde con parola data volte il numero dato, ma la lista dei risultati è pieno di valori Nessuno, quindi dove dovrei mettere i valori di ritorno del func.

O il modo semplice è creare una lista in cui riempio la coda e aggiungi un dizionario o qualche variabile per memorizzare il risultato come argomento alla mia funzione, e dopo che l'attività è stata aggiunta alla coda aggiungi questo argomento risultato un elenco di risultati:

def foo(word, number, r): 
    print word*number 
    r[(word,number)] = number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
results = [] 
for i in range(0, len(words)): 
    r = {} 
    pool.add_task(foo, words[i], numbers[i], r) 
    results.append(r) 
print results 

Sarò molto grato per il vostro aiuto.

risposta

9

Python in realtà ha un built-in pool di thread è possibile utilizzare, its just not well documented:

from multiprocessing.pool import ThreadPool 

def foo(word, number): 
    print (word * number) 
    r[(word,number)] = number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
results = [] 
for i in range(0, len(words)): 
    results.append(pool.apply_async(foo, args=(words[i], numbers[i]))) 

pool.close() 
pool.join() 
results = [r.get() for r in results] 
print results 

Or (usando map invece di apply_async):

from multiprocessing.pool import ThreadPool 

def foo(word, number): 
    print word*number 
    return number 

def starfoo(args): 
    """ 

    We need this because map only supports calling functions with one arg. 
    We need to pass two args, so we use this little wrapper function to 
    expand a zipped list of all our arguments. 

    """  
    return foo(*args) 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
# We need to zip together the two lists because map only supports calling functions 
# with one argument. In Python 3.3+, you can use starmap instead. 
results = pool.map(starfoo, zip(words, numbers)) 
print results 

pool.close() 
pool.join() 
+0

Il secondo caso sarebbe utile quando il numero di attività è uguale alla dimensione della piscina, vero? –

+0

Funzionerà bene con qualsiasi numero di attività e con un 'Pool' con un numero qualsiasi di worker. 'map' è utile se vuoi eseguire una funzione rispetto a tutti gli elementi di un iterable e restituire i risultati di ogni chiamata. Se hai 5 lavoratori per gestire un iterabile di lunghezza 100, il 'Pool' chiamerà la funzione contro tutti i 100 elementi, ma non eseguirà mai più di 5 thread simultaneamente. L'output sarà un iterable della lunghezza 100, con il valore risultante di tutte le chiamate di funzione. – dano

+1

@RafaelRios Un'altra nota, a causa della [GIL] (https://wiki.python.org/moin/GlobalInterpreterLock), l'utilizzo di thread per eseguire il lavoro con CPU in Python non ha alcun vantaggio in termini di prestazioni. Per ovviare a questa limitazione, è necessario utilizzare più processi tramite il modulo ['multiprocessing'] (https://docs.python.org/2.7/library/multiprocessing.html). Per l'esempio precedente, puoi effettuare il passaggio usando 'dal lotto di importazione multiprocessing' invece di' da multiprocessing.pool import ThreadPool'. Tutto il resto rimane lo stesso. – dano

Problemi correlati