2012-03-06 15 views

risposta

2

Sembra che la chiamata a handle_delivery stia bloccando, ma potrebbe essere necessario aggiungere un gestore secondario al ciclo di eventi I/O utilizzando add_timeout. Penso che questo sia quello che stai cercando di fare:

""" 
Asyncronous amqp consumer; do our processing via an ioloop timeout 
""" 

import sys 
import time 

from pika.adapters import SelectConnection 
from pika.connection import ConnectionParameters 

connection = None 
channel = None 


def on_connected(connection): 
    print "timed_receive: Connected to RabbitMQ" 
    connection.channel(on_channel_open) 


def on_channel_open(channel_): 
    global channel 
    channel = channel_ 
    print "timed_receive: Received our Channel" 
    channel.queue_declare(queue="test", durable=True, 
          exclusive=False, auto_delete=False, 
          callback=on_queue_declared) 

class TimingHandler(object): 
    count = 0 
    last_count = 0 

    def __init__(self, delay=0): 
     self.start_time = time.time() 
     self.delay = delay 

    def handle_delivery(self, channel, method, header, body): 
     connection.add_timeout(self.delay, self) 

    def __call__(self): 
     self.count += 1 
     if not self.count % 1000: 
      now = time.time() 
      duration = now - self.start_time 
      sent = self.count - self.last_count 
      rate = sent/duration 
      self.last_count = self.count 
      self.start_time = now 
      print "timed_receive: %i Messages Received, %.4f per second" %\ 
        (self.count, rate) 

def on_queue_declared(frame): 
    print "timed_receive: Queue Declared" 
    channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True) 


if __name__ == '__main__': 

    # Connect to RabbitMQ 
    host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1' 
    connection = SelectConnection(ConnectionParameters(host), 
            on_connected) 
    # Loop until CTRL-C 
    try: 
     # Start our blocking loop 
     connection.ioloop.start() 

    except KeyboardInterrupt: 

     # Close the connection 
     connection.close() 

     # Loop until the connection is closed 
     connection.ioloop.start() 
Problemi correlati