2012-07-02 21 views
7

Ho bisogno di sapere quando una coda è chiusa e non ho più elementi in modo da poter terminare l'iterazione.Coda iterabile Python

ho fatto mettendo una sentinella nella coda:

from Queue import Queue 

class IterableQueue(Queue): 

    _sentinel = object() 

    def __iter__(self): 
     return self 

    def close(self): 
     self.put(self._sentinel) 

    def next(self): 
     item = self.get() 
     if item is self._sentinel: 
      raise StopIteration 
     else: 
      return item 

Dato che questo è un uso molto comune per una coda, non v'è alcuna implementazione incorporato?

+0

I utilizzare la sentinella, o una bandiera all'interno del filo per fermare l'iterazione tramite la coda. Per il dopo, di solito attendo con un timeout. – jdi

risposta

10

una sentinella è un modo ragionevole per un produttore per inviare un messaggio che nessuna attività di coda più sono imminenti.

FWIW, il codice può essere semplificata un po 'con la forma a due argomenti di iter():

from Queue import Queue 

class IterableQueue(Queue): 

    _sentinel = object() 

    def __iter__(self): 
     return iter(self.get, self._sentinel) 

    def close(self): 
     self.put(self._sentinel) 
4

Il modulo di multiprocessing ha una propria versione di Queue che include un metodo close. Non sono sicuro di come funzioni nel threading, ma vale la pena provarlo. Non vedo perché non dovrebbe funzionare lo stesso:

from multiprocessing import Queue 

q = Queue() 
q.put(1) 
q.get_nowait() 
# 1 
q.close() 
q.get_nowait() 
# ... 
# IOError: handle out of range in select() 

si può solo prendere l'IOError come segnale di chiusura.

TEST

from multiprocessing import Queue 
from threading import Thread 

def worker(q): 
    while True: 
     try: 
      item = q.get(timeout=.5) 
     except IOError: 
      print "Queue closed. Exiting thread." 
      return 
     except: 
      continue 
     print "Got item:", item 

q = Queue() 
for i in xrange(3): 
    q.put(i) 
t = Thread(target=worker, args=(q,)) 
t.start() 
# Got item: 0 
# Got item: 1 
# Got item: 2 
q.close() 
# Queue closed. Exiting thread. 

anche se ad essere onesti, non è troppo diversa da quella impostazione di un flag sulla Queue.Queue. Il multiprocessing.Queue è solo utilizzando un descrittore di file chiuso come una bandiera:

from Queue import Queue 

def worker2(q): 
    while True: 
     if q.closed: 
      print "Queue closed. Exiting thread." 
      return 
     try: 
      item = q.get(timeout=.5) 
     except: 
      continue 
     print "Got item:", item 

q = Queue() 
q.closed = False 
for i in xrange(3): 
    q.put(i) 
t = Thread(target=worker2, args=(q,)) 
t.start() 
# Got item: 0 
# Got item: 1 
# Got item: 2 
q.closed = True 
# Queue closed. Exiting thread. 
Problemi correlati