2009-12-11 24 views

risposta

59

vi consiglio di un'istanza di un Queue.Queue prima di iniziare la discussione, e passa come uno dei args del thread: prima del termine del thread, .put s il risultato sulla coda ha ricevuto come argomento. Il genitore può .get o .get_nowait a volontà.

code sono generalmente il modo migliore per organizzare la sincronizzazione dei thread e la comunicazione in Python: sono intrinsecamente thread-safe, veicoli message-passing - il modo migliore per organizzare multitasking in generale -)

+2

'prima che il thread finisca, restituisce il risultato sulla coda che ha ricevuto come argomento' vuoi dire che verrà eseguito automaticamente da python? se no (inteso come suggerimento di progettazione), allora potresti chiarire la risposta. – n611x007

+3

È brutto per specializzare una funzione esistente per quello; e la coda ha un sovraccarico non necessario per un singolo problema di risultati. Più chiaramente ed efficientemente sottoclasse 'threading.Thread' e il nuovo metodo run() memorizza semplicemente il risultato come attributo come' self.ret = ... '(Molto più comodo sarebbe una sottoclasse di Thread che gestisce i valori di ritorno/eccezioni di la funzione di destinazione personalizzata. Infatti 'threading.Thread' dovrebbe essere esteso per offrire quello fuori dalla scatola - come sarebbe compatibile con il vecchio comportamento" return None ".) – kxr

1

Bene, nel modulo di threading Python ci sono oggetti condizione associati ai blocchi. Un metodo acquire() restituirà qualsiasi valore restituito dal metodo sottostante. Per maggiori informazioni: Python Condition Objects

5

Un altro approccio! è passare una funzione di callback al thread. Questo fornisce un modo semplice, sicuro e flessibile per restituire un valore al genitore, in qualsiasi momento dal nuovo thread.

# A sample implementation 

import threading 
import time 

class MyThread(threading.Thread): 
    def __init__(self, cb): 
     threading.Thread.__init__(self) 
     self.callback = cb 

    def run(self): 
     for i in range(10): 
      self.callback(i) 
      time.sleep(1) 


# test 

import sys 

def count(x): 
    print x 
    sys.stdout.flush() 

t = MyThread(count) 
t.start() 
+8

Il problema con questo è che il callback è ancora in esecuzione nel thread figlio, piuttosto che nel thread originale. – babbageclunk

+0

@wilberforce potresti spiegare quali problemi può causare? –

+4

Ok. Un esempio potrebbe essere se il callback scrive in un file di log a cui il thread padre scrive anche mentre il thread è in esecuzione. Poiché il callback è in esecuzione nel thread figlio, c'è il rischio che le due scritture si verifichino contemporaneamente e collidano: si potrebbe ottenere un output confuso o interfogliato o un arresto anomalo se il framework di registrazione esegue una contabilità interna. Usando una coda thread-safe e avendo un thread fare tutta la scrittura eviterebbe questo. Questi tipi di problemi possono essere sgradevoli perché non sono deterministici: potrebbero apparire solo nella produzione e possono essere difficili da riprodurre. – babbageclunk

12

Se stavi chiamando join() per attendere il filo per completare, si può semplicemente collegare il risultato all'istanza Discussione stesso e poi recuperarlo dal thread principale dopo le unirsi rendimenti().

D'altra parte, non ci si dice come si intende scoprire che il thread è terminato e che il risultato è disponibile. Se hai già un modo per farlo, probabilmente ti indicherà (e noi, se dovessi dircelo) il modo migliore per ottenere i risultati.

+0

* si può semplicemente allegare il risultato all'istanza Thread stessa * Come si passa l'istanza Thread alla destinazione che viene eseguita in modo che la destinazione possa allegare il risultato a questa istanza? –

+1

Piotr Dobrogost, se non stai creando una sottoclasse di Thread per la tua istanza, puoi semplicemente utilizzare threading.current_thread() dalla fine del callable target. Lo chiamerei un po 'brutto, ma l'approccio di Alex era sempre il più elegante. Questo è solo più utile in alcuni casi. –

+3

Sarebbe bello se 'join()' restituisse solo il metodo restituito dal metodo chiamato ... sembra sciocco che invece restituisca 'None'. – ArtOfWarfare

10

Si dovrebbe passare un'istanza Queue come parametro, quindi si dovrebbe .put() l'oggetto restituito nella coda. È possibile raccogliere il valore di ritorno tramite queue.get() qualunque oggetto si metta.

Esempio:

queue = Queue.Queue() 
thread_ = threading.Thread(
       target=target_method, 
       name="Thread1", 
       args=[params, queue], 
       ) 
thread_.start() 
thread_.join() 
queue.get() 

def target_method(self, params, queue): 
""" 
Some operations right here 
""" 
your_return = "Whatever your object is" 
queue.put(your_return) 

Uso per più thread:

#Start all threads in thread pool 
    for thread in pool: 
     thread.start() 
     response = queue.get() 
     thread_results.append(response) 

#Kill all threads 
    for thread in pool: 
     thread.join() 

Io uso questa implementazione e funziona benissimo per me. Ti auguro di farlo.

+1

Non ti manca il tuo thread_.start() ?? – sadmicrowave

+1

Naturalmente inizio il thread che mi manca per mettere la linea qui :) Grazie per l'avviso. –

+0

come sarebbe se tu avessi più thread? que.get() restituisce il risultato di un thread solo per me? – ABros

6

Usa lambda per avvolgere la vostra funzione thread di destinazione e passare il suo valore di ritorno di nuovo al thread genitore con una coda. (La funzione di destinazione originale rimane invariato senza parametro coda supplementare.) Codice

Esempio:

import threading 
import queue 
def dosomething(param): 
    return param * 2 
que = queue.Queue() 
thr = threading.Thread(target = lambda q, arg : q.put(dosomething(arg)), args = (que, 2)) 
thr.start() 
thr.join() 
while not que.empty(): 
    print(que.get()) 

uscita:

4 
2

POC:

import random 
import threading 

class myThread(threading.Thread): 
    def __init__(self, arr): 
     threading.Thread.__init__(self) 
     self.arr = arr 
     self.ret = None 

    def run(self): 
     self.myJob(self.arr) 

    def join(self): 
     threading.Thread.join(self) 
     return self.ret 

    def myJob(self, arr): 
     self.ret = sorted(self.arr) 
     return 

#Call the main method if run from the command line. 
if __name__ == '__main__': 
    N = 100 

    arr = [ random.randint(0, 100) for x in range(N) ] 
    th = myThread(arr) 
    th.start() 
    sortedArr = th.join() 

    print "arr2: ", sortedArr 
3

È possibile utilizzare sincronizzato queue modulo.
Considerate è necessario controllare a informazioni degli utenti da database con un ID di nota:

def check_infos(user_id, queue): 
    result = send_data(user_id) 
    queue.put(result) 

Ora è possibile ottenere i dati in questo modo:

import queue, threading 
queued_request = queue.Queue() 
check_infos_thread = threading.Thread(target=check_infos, args=(user_id, queued_request)) 
check_infos_thread.start() 
final_result = queued_request.get() 
6

Sono sorpreso nessuno detto che si poteva solo passare un mutabile:

>>> thread_return={'success': False} 
>>> from threading import Thread 
>>> def task(thread_return): 
... thread_return['success'] = True 
... 
>>> Thread(target=task, args=(thread_return,)).start() 
>>> thread_return 
{'success': True} 

forse questo ha grandi problemi di cui non sono a conoscenza.

+1

Funziona perfettamente! Mi piacerebbe davvero sentire qualche opinione sulle cose che mancano con questo approccio, se ce ne sono. –

+0

funziona. È proprio brutto specializzarsi in una funzione esistente - e in quelle molte cose confuse (leggibilità) - vedere il commento sulla prima risposta. – kxr

+0

come va con il multi-thread? – backslash112

0

La seguente funzione wrapper avvolgerà una funzione esistente e restituirà un oggetto che punta sia al thread (in modo che sia possibile chiamare start(), join(), ecc.) Sia ad accedere/visualizzare il suo valore di ritorno finale.

def threadwrap(func,args,kwargs): 
    class res(object): result=None 
    def inner(*args,**kwargs): 
    res.result=func(*args,**kwargs) 
    import threading 
    t = threading.Thread(target=inner,args=args,kwargs=kwargs) 
    res.thread=t 
    return res 

def myFun(v,debug=False): 
    import time 
    if debug: print "Debug mode ON" 
    time.sleep(5) 
    return v*2 

x=threadwrap(myFun,[11],{"debug":True}) 
x.thread.start() 
x.thread.join() 
print x.result 

sembra ok, e la classe threading.Thread sembra essere facilmente esteso (*) con questo tipo di funzionalità, quindi mi chiedo il motivo per cui non è già presente. C'è un difetto con il metodo sopra?

(*) Si noti che la risposta di husanu per questa domanda fa esattamente questo, sottoclasse threading.Thread risultante in una versione in cui join() fornisce il valore restituito.

1

In base al suggerimento di jcomeau_ictx. Il più semplice che ho trovato. Il requisito qui era quello di ottenere lo status di uscita stabile da tre diversi processi in esecuzione sul server e attivare un altro script se tutti e tre hanno avuto successo. Questo sembra funzionare bene

class myThread(threading.Thread): 
     def __init__(self,threadID,pipePath,resDict): 
      threading.Thread.__init__(self) 
      self.threadID=threadID 
      self.pipePath=pipePath 
      self.resDict=resDict 

     def run(self): 
      print "Starting thread %s " % (self.threadID) 
      if not os.path.exists(self.pipePath): 
      os.mkfifo(self.pipePath) 
      pipe_fd = os.open(self.pipePath, os.O_RDWR | os.O_NONBLOCK) 
      with os.fdopen(pipe_fd) as pipe: 
       while True: 
        try: 
        message = pipe.read() 
        if message: 
         print "Received: '%s'" % message 
         self.resDict['success']=message 
         break 
        except: 
         pass 

    tResSer={'success':'0'} 
    tResWeb={'success':'0'} 
    tResUisvc={'success':'0'} 


    threads = [] 

    pipePathSer='/tmp/path1' 
    pipePathWeb='/tmp/path2' 
    pipePathUisvc='/tmp/path3' 

    th1=myThread(1,pipePathSer,tResSer) 
    th2=myThread(2,pipePathWeb,tResWeb) 
    th3=myThread(3,pipePathUisvc,tResUisvc) 

    th1.start() 
    th2.start() 
    th3.start() 

    threads.append(th1) 
    threads.append(th2) 
    threads.append(th3) 

    for t in threads: 
     print t.join() 

    print "Res: tResSer %s tResWeb %s tResUisvc %s" % (tResSer,tResWeb,tResUisvc) 
    # The above statement prints updated values which can then be further processed