2014-10-21 12 views
7

Desidero consumare una coda (RabbitMQ) in modo sincrono con il blocco.Consumo sincrono e di blocco in RabbitMQ utilizzando pika

Nota: di seguito è riportato il codice completo pronto per essere eseguito.

L'installazione del sistema utilizza RabbitMQ come sistema di accodamento, ma il consumo asincrono non è necessario in uno dei nostri moduli.

Ho provato con basic_get sulla cima di una BlockingConnection, che non blocca (rendimenti (None, None, None) immediatamente):

# declare queue 
get_connection().channel().queue_declare(TEST_QUEUE) 
def blocking_get_1(): 

     channel = get_connection().channel() 

     # get from an empty queue (prints immediately) 
     print channel.basic_get(TEST_QUEUE) 

Ho anche provato ad utilizzare il consume generator, non riesce con "Connessione chiusa" dopo un lungo periodo di non consumo.

def blocking_get_2(): 
     channel = get_connection().channel() 
     # put messages in TEST_QUEUE 
     for i in range(4): 
       channel.basic_publish(
         '', 
         TEST_QUEUE, 
         'body %d' % i 
       ) 
     consume_generator = channel.consume(TEST_QUEUE) 
     print next(consume_generator) 
     time.sleep(14400) 
     print next(consume_generator) 

C'è un modo per utilizzare RabbitMQ utilizzando il client Pika, come se fosse un Queue.Queue in Python? o qualcosa di simile?

La mia opzione al momento è busy-wait (utilizzando basic_get) - ma preferisco usare il sistema esistente per non essere occupato-aspettare, se possibile.

codice completo:

#!/usr/bin/env python 
import pika 
import time 

TEST_QUEUE = 'test' 
def get_connection(): 
     # define connection 
     connection = pika.BlockingConnection(
       pika.ConnectionParameters(
         host=YOUR_IP, 
         port=YOUR_PORT, 
         credentials=pika.PlainCredentials(
           username=YOUR_USER, 
           password=YOUR_PASSWORD, 
         ) 
       ) 
     ) 
     return connection 

# declare queue 
get_connection().channel().queue_declare(TEST_QUEUE) 
def blocking_get_1(): 

     channel = get_connection().channel() 

     # get from an empty queue (prints immediately) 
     print channel.basic_get(TEST_QUEUE) 

def blocking_get_2(): 
     channel = get_connection().channel() 
     # put messages in TEST_QUEUE 
     for i in range(4): 
       channel.basic_publish(
         '', 
         TEST_QUEUE, 
         'body %d' % i 
       ) 
     consume_generator = channel.consume(TEST_QUEUE) 
     print next(consume_generator) 
     time.sleep(14400) 
     print next(consume_generator) 


print "blocking_get_1" 
blocking_get_1() 

print "blocking_get_2" 
blocking_get_2() 

get_connection().channel().queue_delete(TEST_QUEUE) 
+0

Penso che abbia anche a che fare con l'invio dell'heartbeat ('consumare' forse li blocca?) Come visto qui: http://stackoverflow.com/questions/14572020/handling-long-running-tasks-in- pika-rabbitmq –

+1

Ho postato la mia opinione su questo, ma fammi sapere se ho frainteso la tua domanda. :) – eandersson

risposta

10

Un problema comune con Pika è che al momento non è la gestione degli eventi in arrivo in background. Ciò significa che in molti casi è necessario chiamare periodicamente lo connection.process_data_events() per assicurarsi che non salti gli heartbeat.

Questo significa anche che se si dorme per un lungo periodo di tempo, Pika non gestirà i dati in arrivo e alla fine morirà poiché non risponde ai battiti del cuore. Un'opzione qui è di disabilitare i battiti cardiaci.

Solitamente risolvo questo problema avendo un thread in background per verificare i nuovi eventi, come mostrato nell'esempio this.

Se si desidera bloccare completamente, farei qualcosa di simile (basato sulla mia libreria AMQP-Storm).

while True: 
    result = channel.basic.get(queue='simple_queue', no_ack=False) 
    if result: 
     print("Message:", result['body']) 
     channel.basic.ack(result['method']['delivery_tag']) 
    else: 
     print("Channel Empty.") 
     sleep(1) 

Questo è basato sull'esempio trovato here.

+0

Ricordo di aver avuto problemi durante l'accesso alla connessione da due thread. La comunicazione inter-thread aggiunge un sovraccarico, quindi aspetterò un modo per farlo senza di esso. Gli darò un altro andare più tardi e aggiornare qui. –

+2

Sì, se stai usando pika può essere difficile. Non è progettato per il threading, ma l'esempio che ho collegato può gestire un bel po 'di messaggi simultanei. La mia libreria amqp-storm d'altra parte dovrebbe renderla più facile, dato che è thread-safe. – eandersson

Problemi correlati