2015-03-30 13 views
7

Utilizzo un server Web di Tornado per mettere in coda gli articoli che devono essere elaborati al di fuori del ciclo di richiesta/risposta.Creazione di una coda di elaborazione in Tornado

Nel mio esempio semplificato di seguito, ogni volta che arriva una richiesta, aggiungo una nuova stringa a un elenco chiamato queued_items. Voglio creare qualcosa che guarderà quell'elenco ed elaborerà gli oggetti così come appaiono in esso.

(Nel mio codice reale, gli articoli vengono elaborati e inviati tramite un socket TCP che può essere connesso o meno quando arriva la richiesta Web. Desidero che il server Web continui ad accodare gli elementi indipendentemente dalla connessione socket)

Sto cercando di mantenere questo codice semplice e di non utilizzare code esterne/programmi come Redis o Beanstalk. Non avrà un volume molto alto.

Qual è un buon modo utilizzando gli idiomi di Tornado per guardare l'elenco client.queued_items per i nuovi articoli ed elaborarli all'arrivo?

import time 

import tornado.ioloop 
import tornado.gen 
import tornado.web 

class Client(): 

    def __init__(self): 
     self.queued_items = [] 

    @tornado.gen.coroutine 
    def watch_queue(self): 
     # I have no idea what I'm doing 
     items = yield client.queued_items 
     # go_do_some_thing_with_items(items) 

class IndexHandler(tornado.web.RequestHandler): 

    def get(self): 
     client.queued_items.append("%f" % time.time()) 
     self.write("Queued a new item") 

if __name__ == "__main__": 

    client = Client() 

    # Watch the queue for when new items show up 
    client.watch_queue() 

    # Create the web server 
    application = tornado.web.Application([ 
     (r'/', IndexHandler), 
    ], debug=True) 

    application.listen(8888) 
    tornado.ioloop.IOLoop.instance().start() 

risposta

11

C'è una libreria chiamata toro, che fornisce primitive di sincronizzazione per tornado. [Aggiornamento: Come di tornado 4.2, toro è stata fusa in tornado.]

Suona come si potrebbe utilizzare un toro.Queue (o tornado.queues.Queue in tornado 4.2+) per gestire questa situazione:

import time 

import toro 
import tornado.ioloop 
import tornado.gen 
import tornado.web 

class Client(): 

    def __init__(self): 
     self.queued_items = toro.Queue() 

    @tornado.gen.coroutine 
    def watch_queue(self): 
     while True: 
      items = yield self.queued_items.get() 
      # go_do_something_with_items(items) 

class IndexHandler(tornado.web.RequestHandler): 

    @tornado.gen.coroutine 
    def get(self): 
     yield client.queued_items.put("%f" % time.time()) 
     self.write("Queued a new item") 

if __name__ == "__main__": 

    client = Client() 

    # Watch the queue for when new items show up 
    tornado.ioloop.IOLoop.instance().add_callback(client.watch_queue) 

    # Create the web server 
    application = tornado.web.Application([ 
     (r'/', IndexHandler), 
    ], debug=True) 

    application.listen(8888) 
    tornado.ioloop.IOLoop.instance().start() 

Ci sono alcune modifiche necessarie, a parte commutazione della struttura di dati da un elenco per un toro.Queue:

  1. Dobbiamo pianificare watch_queue per eseguire all'interno di IOLoop utilizzando add_callback, anziché cercare di chiamarlo direttamente al di fuori di un contesto IOLoop.
  2. IndexHandler.get deve essere convertito in una coroutine, perché toro.Queue.put è una coroutine.

Ho anche aggiunto un ciclo while True-watch_queue, in modo che venga eseguito per sempre, e non solo l'elaborazione di un elemento e poi uscire.

+0

Questo è esattamente ciò di cui avevo bisogno. Grazie per avermi mostrato come implementarlo. – Scott

+0

Dano - Come posso smettere di guardare la coda? Quando la mia connessione va male, dovrò interrompere temporaneamente l'elaborazione degli articoli in coda ma non perderli. – Scott

+1

toro è stato fuso in tornado ed è ora deprecato. Per tornado> = 4.2 puoi usare 'tornado.queues.Queue' –

Problemi correlati