2012-09-14 12 views
11

Ho un client python worker che fa girare 10 worker che si collegano a una coda RabbitMQ. Un po 'come questo:Pika + RabbitMQ: l'impostazione basic_qos to prefetch = 1 appare ancora per consumare tutti i messaggi in coda

#!/usr/bin/python 
worker_count=10 

def mqworker(queue, configurer): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost')) 
    channel = connection.channel() 
    channel.queue_declare(queue=qname, durable=True) 
    channel.basic_consume(callback,queue=qname,no_ack=False) 
    channel.basic_qos(prefetch_count=1) 
    channel.start_consuming() 


def callback(ch, method, properties, body): 
    doSomeWork(); 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

if __name__ == '__main__': 
    for i in range(worker_count): 
     worker = multiprocessing.Process(target=mqworker) 
     worker.start() 

Il problema che ho è che, nonostante l'impostazione basic_qos sul canale, il primo lavoratore di iniziare accetta tutti i messaggi dalla coda, mentre gli altri si siedono lì inattivo. Riesco a vederlo nell'interfaccia rabbitmq, anche quando imposto lo worker_count come 1 e scarico 50 messaggi in coda, tutti i 50 entrano nel bucket "non riconosciuto", mentre mi aspetto che 1 diventi non riconosciuto e gli altri 49 per Sii pronto.

Perché non funziona?

risposta

14

Mi sembra di aver risolto questo spostando dove viene chiamato basic_qos.

Il posizionamento subito dopo channel = connection.channel() sembra alterare il comportamento a ciò che mi aspetterei.

+0

grazie! questo ha risolto il problema. e btw questo è molto difficile da eseguire il debug. – Sajuuk

+0

@Hiagara yeah mi sono imbattuto in questo oggi stesso. Incredibile che quasi 5 anni dopo ciò non è ancora chiaro o documentato nell'API. – Jordan

Problemi correlati