2013-05-31 13 views
9

Sto cercando una classe Python (preferibilmente parte della lingua standard, piuttosto che una libreria di terze parti) per gestire la messaggistica asincrona dello "stile broadcast".È necessaria una coda di messaggi asincroni thread-safe

Avrò un thread che metterà i messaggi in coda (il metodo 'putMessageOnQueue' non deve bloccare) e quindi più altri thread che attenderanno tutti i messaggi, presumibilmente chiamato funzione di blocco 'waitForMessage'. Quando un messaggio viene inserito nella coda, voglio che ognuno dei thread in attesa ottenga la propria copia del messaggio.

Ho esaminato la classe integrata Queue, ma non penso che sia adatta perché il consumo di messaggi sembra implicare la loro rimozione dalla coda, quindi solo 1 thread client vedrebbe ciascuno.

Questo sembra che dovrebbe essere un caso d'uso comune, qualcuno può consigliare una soluzione?

+0

I credi di poter costruire una tua classe che tenga traccia di quale thread ha ricevuto quel messaggio, senza molti problemi. – Bakuriu

risposta

7

Penso che l'approccio tipico a questo è quello di utilizzare una coda di messaggi separata per ogni thread e spingere il messaggio su ogni coda che ha precedentemente registrato un interesse a ricevere tali messaggi.

Qualcosa del genere dovrebbe funzionare, ma è codice non testato ...

from time import sleep 
from threading import Thread 
from Queue import Queue 

class DispatcherThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(DispatcherThread, self).__init__(*args, **kwargs) 
     self.interested_threads = [] 

    def run(self): 
     while 1: 
      if some_condition: 
       self.dispatch_message(some_message) 
      else: 
       sleep(0.1) 

    def register_interest(self, thread): 
     self.interested_threads.append(thread) 

    def dispatch_message(self, message): 
     for thread in self.interested_threads: 
      thread.put_message(message) 



class WorkerThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(WorkerThread, self).__init__(*args, **kwargs) 
     self.queue = Queue() 


    def run(self): 

     # Tell the dispatcher thread we want messages 
     dispatcher_thread.register_interest(self) 

     while 1: 
      # Wait for next message 
      message = self.queue.get() 

      # Process message 
      # ... 

    def put_message(self, message): 
     self.queue.put(message) 


dispatcher_thread = DispatcherThread() 
dispatcher_thread.start() 

worker_threads = [] 
for i in range(10): 
    worker_thread = WorkerThread() 
    worker_thread.start() 
    worker_threads.append(worker_thread) 

dispatcher_thread.join() 
+0

Perfetto, funziona benissimo! Peccato che non ci sia una versione già pronta, ma immagino che il principio non sia così complicato una volta che qualcuno lo spiega chiaramente (come hai fatto tu). – codebox

+0

@codebox Bene, c'è un supporto migliore nel modulo ['multiprocessing'] (http://docs.python.org/2/library/multiprocessing.html), ma questo è per i sottoprocessi piuttosto che per i thread. Immagino sia perché la comunicazione tra processi è di solito più complessa della comunicazione inter-thread, poiché i thread condividono naturalmente lo stesso heap. – Aya

2

Penso che questo sia un esempio più semplice (presa dal esempio coda in Python Lib)

from threading import Thread 
from Queue import Queue 


num_worker_threads = 2 

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 
+0

Come soddisfa i requisiti della domanda? Ha detto esplicitamente che la coda non funziona perché ogni thread ha bisogno di una copia dell'articolo. – Wlerin

Problemi correlati