Desidero consumare una coda (RabbitMQ) in modo sincrono con il blocco.Consumo sincrono e di blocco in RabbitMQ utilizzando pika
Nota: di seguito è riportato il codice completo pronto per essere eseguito.
L'installazione del sistema utilizza RabbitMQ come sistema di accodamento, ma il consumo asincrono non è necessario in uno dei nostri moduli.
Ho provato con basic_get sulla cima di una BlockingConnection, che non blocca (rendimenti (None, None, None)
immediatamente):
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
Ho anche provato ad utilizzare il consume generator, non riesce con "Connessione chiusa" dopo un lungo periodo di non consumo.
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
C'è un modo per utilizzare RabbitMQ utilizzando il client Pika, come se fosse un Queue.Queue
in Python? o qualcosa di simile?
La mia opzione al momento è busy-wait (utilizzando basic_get) - ma preferisco usare il sistema esistente per non essere occupato-aspettare, se possibile.
codice completo:
#!/usr/bin/env python
import pika
import time
TEST_QUEUE = 'test'
def get_connection():
# define connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=YOUR_IP,
port=YOUR_PORT,
credentials=pika.PlainCredentials(
username=YOUR_USER,
password=YOUR_PASSWORD,
)
)
)
return connection
# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():
channel = get_connection().channel()
# get from an empty queue (prints immediately)
print channel.basic_get(TEST_QUEUE)
def blocking_get_2():
channel = get_connection().channel()
# put messages in TEST_QUEUE
for i in range(4):
channel.basic_publish(
'',
TEST_QUEUE,
'body %d' % i
)
consume_generator = channel.consume(TEST_QUEUE)
print next(consume_generator)
time.sleep(14400)
print next(consume_generator)
print "blocking_get_1"
blocking_get_1()
print "blocking_get_2"
blocking_get_2()
get_connection().channel().queue_delete(TEST_QUEUE)
Penso che abbia anche a che fare con l'invio dell'heartbeat ('consumare' forse li blocca?) Come visto qui: http://stackoverflow.com/questions/14572020/handling-long-running-tasks-in- pika-rabbitmq –
Ho postato la mia opinione su questo, ma fammi sapere se ho frainteso la tua domanda. :) – eandersson