2013-07-09 8 views
5

L'adattatore tornado di supporto libreria Pika, here è un esempio di come pubblicare un messaggio utilizzando l'adattatore asincrono.Come comunicare RabbitMQ (libreria Pika) nell'applicazione tornado

Voglio usare pika in un'applicazione tornado, solo un esempio, voglio mettere dati di richiesta tornado su RabbitMQ, ma non so come farlo.

Due domande non sanno come risolvere.

1 Pika adattatore uso tornado ha un proprio ioloop,

self._connection = pika.SelectConnection(pika.URLParameters(self._url), 
             self.on_connection_open) 
self._connection.ioloop.start() 

applicazione Tornado ha un proprio ioloop,

tornado.ioloop.IOLoop.instance().start() 

Come combinare questi due ioloop?

2 L'esempio Pika pubblica più volte lo stesso messaggio, ma desidero pubblicare i dati della richiesta, come passare i dati della richiesta per pubblicare il metodo?

risposta

6

Sulla mia ricerca esattamente la stessa cosa ho trovato questo blog post of Kevin Jing Qiu.

Sono andato un po 'più vicino al buco di coniglio per dare a ogni websocket il proprio set di canali e code.

L'estratto dal mio progetto può essere trovato sotto. Un'applicazione tornado legata a RabbitMQ è composta da queste parti:

  1. L'applicazione Tornado che gestirà le richieste web. Vedo solo webs di lunga durata qui, ma puoi farlo anche con richieste HTTP di breve durata.
  2. Una (una) connessione RabbitMQ attesa dall'istanza PikaClient
  3. una connessione Web che definisce i canali, le code e gli scambi quando viene attivato il metodo aperto.

Ora una connessione websocket può ricevere dati dal tornado (dati dal browser) tramite ON_MESSAGE e inviarlo a RabbitMQ.

La connessione web socket riceverà i dati da RabbitMQ tramite basic_consume.

Questo non è completamente funzionante, ma è necessario ottenere l'idea.

class PikaClient(object): 

    def __init__(self, io_loop): 
     logger.info('PikaClient: __init__') 
     self.io_loop = io_loop 

     self.connected = False 
     self.connecting = False 
     self.connection = None 
     self.channel = None 
     self.message_count = 0 
    """ 
    Pika-Tornado connection setup 
    The setup process is a series of callback methods. 
    connect:connect to rabbitmq and build connection to tornado io loop -> 
    on_connected: create a channel to rabbitmq -> 
    on_channel_open: declare queue tornado, bind that queue to exchange 
        chatserver_out and start consuming messages. 
    """ 

    def connect(self): 
     if self.connecting: 
      #logger.info('PikaClient: Already connecting to RabbitMQ') 
      return 

     #logger.info('PikaClient: Connecting to RabbitMQ') 
     self.connecting = True 

     cred = pika.PlainCredentials('guest', 'guest') 
     param = pika.ConnectionParameters(
      host='localhost', 
      port=5672, 
      virtual_host='/', 
      credentials=cred 
     ) 
     self.connection = TornadoConnection(param, 
      on_open_callback=self.on_connected,stop_ioloop_on_close=False) 
     self.connection.add_on_close_callback(self.on_closed) 

    def on_connected(self, connection): 
     logger.info('PikaClient: connected to RabbitMQ') 
     self.connected = True 
     self.connection = connection 
     # now you are able to call the pika api to do things 
     # this could be exchange setup for websocket connections to 
     # basic_publish to later. 
     self.connection.channel(self.on_channel_open) 

    def on_channel_open(self, channel): 
     logger.info('PikaClient: Channel %s open, Declaring exchange' % channel) 
     self.channel = channel 

    def on_closed(self, connection): 
     logger.info('PikaClient: rabbit connection closed') 
     self.io_loop.stop() 


class MyWebSocketHandler(websocket.WebSocketHandler): 
    def __init__(self): 
     self.status = 'not connected yet' 

    def open(self, *args, **kwargs): 
     self.status = "ws open" 
     self.rabbit_connect() # connect this websocket object to rabbitmq 

    def rabbit_connect(): 
     self.application.pc.connection.channel(self.rabbit_channel_in_ok) 

    def rabbit_channel_in_ok(self,channel): 
     self.channel_in = channel 
     self.channel_in.queue_declare(self.rabbit_declare_ok, 
             exclusive=True,auto_delete=True) 


# and so on... 


handlers = [ your_definitions_here_like_websockets_or_such ] 
settings = { your_settings_here } 
application = tornado.web.Application(handlers,**settings) 

def main(): 
    io_loop = tornado.ioloop.IOLoop.instance() 
    # PikaClient is our rabbitmq consumer 
    pc = PikaClient(io_loop) 
    application.pc = pc 
    application.pc.connect() 
    application.listen(config.tornadoport) 
    try: 
     io_loop.start() 
    except KeyboardInterrupt: 
     io_loop.stop() 

if __name__ == '__main__': 
    main() 
+0

Il collegamento fornito è obsoleto. – FactualHarmony

+0

grazie. e risolto. – itsafire

Problemi correlati