2009-07-15 16 views
34

Ho un programma multithread in cui creo una funzione di generatore e poi la passo a nuovi thread. Voglio che sia condiviso/globale in natura in modo che ogni thread possa ottenere il valore successivo dal generatore.I generatori sono thread-safe?

È sicuro usare un generatore come questo, o mi imbatterò in problemi/condizioni che accedono al generatore condiviso da più thread?

In caso contrario, c'è un modo migliore per affrontare il problema? Ho bisogno di qualcosa che scorrerà ciclicamente un elenco e produrrà il valore successivo per qualsiasi thread lo chiami.

risposta

49

Non è thread-safe; le chiamate simultanee possono alternarsi e interferire con le variabili locali.

L'approccio comune consiste nell'utilizzare il modello master-slave (ora chiamato pattern farmer-worker in PC). Crea un terzo thread che genera dati e aggiungi una coda tra il master e gli slave, dove gli slave leggeranno dalla coda e il master scriverà su di esso. Il modulo di coda standard fornisce la necessaria sicurezza del thread e organizza il blocco del master fino a quando gli slave non sono pronti a leggere più dati.

+7

Sicuramente +1 per Queue.Queue, ottimo modo per organizzare sistema filettatura quando applicabile (che è il più delle volte, e sicuramente per questo compito). –

-7

Dipende da quale implementazione di Python che si sta utilizzando. In CPython, GIL rende tutte le operazioni sugli oggetti python threadsafe, poiché solo un thread può eseguire codice in qualsiasi momento.

http://en.wikipedia.org/wiki/Global_Interpreter_Lock

+1

"il GIL rende tutte le operazioni su oggetti python threadsafe" - eh? tutte le operazioni non sono atomiche –

+6

Ciò è pericolosamente fuorviante. Il GIL significa solo che il codice Python non corromperà lo stato di Python in un ambiente con multithreading: non puoi cambiare i thread nel mezzo di un op bytecode. Ad esempio, è possibile modificare un dettato condiviso senza corromperlo. È comunque possibile modificare i thread tra due operazioni optecode. –

40

A cura di aggiungere punto di riferimento al di sotto.

È possibile avvolgere un generatore con un blocco. Ad esempio,

import threading 
class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

gen = [x*2 for x in [1,2,3,4]] 
g2 = LockedIterator(gen) 
print list(g2) 

blocco prende 50ms sul mio sistema, Coda prende 350ms. La coda è utile quando si ha realmente una coda; ad esempio, se hai richieste HTTP in arrivo e vuoi metterle in coda per l'elaborazione tramite thread di lavoro. (Questo non si adatta al modello di iteratore di Python - una volta che un iteratore ha esaurito gli elementi, è fatto.) Se hai davvero un iteratore, allora LockedIterator è un modo più rapido e semplice per renderlo sicuro.

from datetime import datetime 
import threading 
num_worker_threads = 4 

class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

def test_locked(it): 
    it = LockedIterator(it) 
    def worker(): 
     try: 
      for i in it: 
       pass 
     except Exception, e: 
      print e 
      raise 

    threads = [] 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     threads.append(t) 
     t.start() 

    for t in threads: 
     t.join() 

def test_queue(it): 
    from Queue import Queue 
    def worker(): 
     try: 
      while True: 
       item = q.get() 
       q.task_done() 
     except Exception, e: 
      print e 
      raise 

    q = Queue() 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    t1 = datetime.now() 

    for item in it: 
     q.put(item) 

    q.join() 

start_time = datetime.now() 
it = [x*2 for x in range(1,10000)] 

test_locked(it) 
#test_queue(it) 
end_time = datetime.now() 
took = end_time-start_time 
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000) 
+1

Meno efficiente quindi utilizzando un Queue.Queue, ma ben fatto. – gooli