2011-11-20 11 views
11

Poiché nessuno ha fornito una soluzione a this post oltre al fatto che ho un disperato bisogno di soluzioni alternative, ecco la mia situazione e alcune soluzioni/idee astratte per il dibattito.Hack dell'integrazione del sedano Tornado

mio stack:

  1. Tornado
  2. sedano
  3. MongoDB
  4. Redis
  5. RabbitMQ

Il mio problema: trovare un modo per i Tornado di spedire un sedano compito (risolto) a d poi raccogliere in modo asincrono il risultato (qualche idea?).

Scenario 1: (richiesta/mod risposta più webhook)

  • Tornado riceve una richiesta (utente), quindi salva nella memoria locale (o in Redis) un {JobID: (utente) richiesta} di ricordare dove per propagare la risposta, e spara un compito sedano con JobID
  • Quando sedano completa l'operazione, viene eseguita una webhook ad un certo URL e dice tornado che questo JobID ha terminato (più i risultati)
  • Tornado recupera il (utente) richiesta e inoltra una risposta al (utente)

Può succedere questo? Ha qualche logica?

Scenario 2: (tornado più a lungo polling)

  • Tornado invia il compito di sedano e restituisce alcuni dati JSON primarie al client (jQuery)
  • jQuery fa alcune a lungo polling al ricevimento del json primario, ad esempio, ogni x microsecondi e le risposte del tornado in base ad alcuni flag del database. Quando l'attività di celery è completata, questo flag di database è impostato su True, quindi jQuery "loop" è terminato.

È efficiente?

Altre idee/schemi?

risposta

4

Mi sono imbattuto in questa domanda e colpendo i risultati il ​​backend ripetutamente non mi sembrava ottimale. Così ho implementato un Mixin simile al tuo Scenario 1 usando Unix Sockets.

Notifica Tornado non appena l'attività termina (per essere precisi, non appena l'attività successiva in catena viene eseguita) e colpisce solo i risultati backend una volta. Ecco lo link.

+0

Ottimo lavoro Eren! – hymloth

9

La mia soluzione prevede di polling dal tornado a sedano:

class CeleryHandler(tornado.web.RequestHandlerr): 

    @tornado.web.asynchronous 
    def get(self):  

     task = yourCeleryTask.delay(**kwargs) 

     def check_celery_task(): 
      if task.ready(): 
       self.write({'success':True}) 
       self.set_header("Content-Type", "application/json") 
       self.finish() 
      else: 
       tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task) 

     tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task) 

Ecco post su di esso.

+0

la prego di pubblicare il tuo link blog, è stato preso in giù! – vgoklani

+1

Modificato per essere un link archive.org – rbu

8

Ecco la nostra soluzione al problema. Dal momento che cerchiamo i risultati in diversi gestori nella nostra applicazione, abbiamo reso la ricerca di sedano una classe di mixaggio.

Questo rende anche il codice più leggibile con il modello tornado.gen.

from functools import partial 

class CeleryResultMixin(object): 
    """ 
    Adds a callback function which could wait for the result asynchronously 
    """ 
    def wait_for_result(self, task, callback): 
     if task.ready(): 
      callback(task.result) 
     else: 
      # TODO: Is this going to be too demanding on the result backend ? 
      # Probably there should be a timeout before each add_callback 
      tornado.ioloop.IOLoop.instance().add_callback(
       partial(self.wait_for_result, task, callback) 
      ) 


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler): 
    """Execute a task asynchronously over a celery worker. 
    Wait for the result without blocking 
    When the result is available send it back 
    """ 
    @tornado.web.asynchronous 
    @tornado.web.authenticated 
    @tornado.gen.engine 
    def post(self): 
     """Test the provided Magento connection 
     """ 
     task = expensive_task.delay(
      self.get_argument('somearg'), 
     ) 

     result = yield tornado.gen.Task(self.wait_for_result, task) 

     self.write({ 
      'success': True, 
      'result': result.some_value 
     }) 
     self.finish() 
3

Ora, https://github.com/mher/tornado-celery viene in soccorso ...

class GenAsyncHandler(web.RequestHandler): 
    @asynchronous 
    @gen.coroutine 
    def get(self): 
     response = yield gen.Task(tasks.sleep.apply_async, args=[3]) 
     self.write(str(response.result)) 
     self.finish()