2012-02-22 18 views
14

voglio segnalazioni di processo in poche discussioni, ma sto ottenendo errore durante l'esecuzione di questo codice: desctiptionerrore "tag di consegna sconosciuta" si verifica quando si tenta messaggi ACK RabbitMQ utilizzando pika (python)

from __future__ import with_statement 
import pika 
import sys 
from pika.adapters.blocking_connection import BlockingConnection 
from pika import connection, credentials 
import time 
import threading 
import random 
from pika.adapters.select_connection import SelectConnection 
from pika.connection import Connection 
import traceback 


def doWork(body, args, channel): 


    r = random.random() 
    time.sleep(r * 10) 
    try:   
     channel.basic_ack(delivery_tag=args.delivery_tag) 

    except : 
     traceback.print_exc() 


auth = credentials.PlainCredentials(username="guest", password="guest") 
params = connection.ConnectionParameters(host="localhost", credentials=auth) 
conn = BlockingConnection(params) 
channel = conn.channel() 


while True: 

    time.sleep(0.03)  
    try: 

     method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
     if method_frame.NAME == 'Basic.GetEmpty': 
      continue   

     t = threading.Thread(target=doWork, args=[body, method_frame, channel]) 
     t.setDaemon(True) 
     t.start() 

    except Exception, e: 
     traceback.print_exc() 
     continue 

errore:

 
Traceback (most recent call last): 
    File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get 
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack) 
    File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get 
    no_ack=no_ack)) 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method 
    self.connection.process_data_events() 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events 
    self._handle_read() 
    File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read 
    self._on_data_available(data) 
    File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available 
    frame)     # Args 
    File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process 
    callback(*args, **keywords) 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close 
    frame.method.reply_text) 
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204') 

Versioni: Pika 0.9.5, 2.6.1 RabbitMQ

+0

Ieri ho cercato di usare libreria di py-amqplib invece pika. Ha funzionato bene. Probabilmente c'è un problema nella libreria pika. – solo117

+1

Se vuoi condividere il tuo codice su più thread, dovresti usare una libreria thread-safe come rabbitpy o amqp-storm. Non sono sicuro che py-amqplib sia protetto da thread. https://github.com/eandersson/amqp-storm – eandersson

risposta

3

non ho una soluzione, ma posso verificare che si verifica utilizzando l'adattatore BlockingConnection .

Essa si verifica in modo coerente quando riconoscere o rifiutare un messaggio che viene riconsegnata in risposta ad una channel.basic_recover()

Pika 0.9.5, 2.2.0 RabbitMQ, python 2.7, e Erlang R14B01

La soluzione che ho in atto è quella di specificare sempre deliver_tag = 0

Ho il sospetto che ciò funzioni solo se il messaggio che stai acking/nacking è l'ultimo che hai letto (in streaming). La biblioteca che sto scrivendo abstracts il messaggio in modo tale che ognuno può essere riconosciuto in modo indipendente, che rompe con questa soluzione.

Qualcuno può confermare se questo è stato corretto o riconosciuto da qualcuno della squadra di Pika ancora? Oppure potrebbe essere un problema con RabbitMQ?

+0

Sto vedendo questo errore con node-amqp, quindi deve essere un problema con RabbitMQ (versione 3.0.2-1). – alexfernandez

0

Dopo aver visto RabbitMQ - upgraded to a new version and got a lot of "PRECONDITION_FAILED unknown delivery tag 1"

ho cambiato di base-consumare per assomigliare a questo:

consumer_tag = channel.basic_consume(
     message_delivery_event, 
     no_ack=True, 
     queue=queue, 
    ) 

Questo ha avuto l'effetto di causare l'errore descritto in riconoscimenti iniziali (non riconsegnato) quando tag di consegna del messaggio è stato specificato La consegna è stata estratta dalla struttura del metodo di consegna del messaggio.

Utilizzando

channel.basic_ack(delivery_tag=0) 

sopprime l'errore in questo caso, anche

Guardando http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.html fa sembrare come se può essere un problema in RabbitMQ.

26

Il problema probabilmente è che si sta impostando no_ack=True come questo:

consumer_tag = channel.basic_consume(
    message_delivery_event, 
    no_ack=True, 
    queue=queue, 
) 

E poi riconoscendo i messaggi:

channel.basic_ack(delivery_tag=args.delivery_tag) 

Devi scegliere se si vuole riconoscere o meno e impostare il parametro di consumo corretto.

+0

La causa principale del mio codice è il problema di sincronizzazione e di configurazione. Ho un semplice involucro per creare un utente di rabbitmq. Quando si utilizza una coda temporanea (channel.queueDeclare ("", false, true, true, args) .getQueue()), la prossima Delivery deve essere protetta tramite sync in env multi-thread. Ciò significa che se ricevi un messaggio, devi accettarlo prima di consumare qualsiasi altro messaggio. Altrimenti, quando chiami l'ack, genererà un'eccezione e continuerà a generare eccezioni quando consumi ... – DeepNightTwo

+0

Questo era esattamente il problema che stavo avendo, grazie molte. – Rob

+1

Stavo ricevendo questo errore dopo una scarsa unione in cui un messaggio veniva contattato due volte con lo stesso tag di consegna – blockloop

2

C'è un bug nel codice. Condividi un canale attraverso i thread. Questo non è supportato da pika (vedi FAQ).Hai 2 opzioni:

  1. definire la bandiera no_ack=True in basic_get(...) e non utilizzare l'oggetto del canale in funzione del thread doWork(...)
  2. Se avete bisogno di ACK messaggio solo dopo aver finito il tuo lavoro, poi lasciare che il principale thread (il ciclo while True:) gestisce il messaggio ack (e non il thread di lavoro). Di seguito è riportata una versione modificata del tuo codice che lo fa.

    from __future__ import with_statement 
    import pika 
    import sys 
    from pika.adapters.blocking_connection import BlockingConnection 
    from pika import connection, credentials 
    import time 
    import threading 
    import random 
    from pika.adapters.select_connection import SelectConnection 
    from pika.connection import Connection 
    import traceback 
    from Queue import Queue, Empty 
    
    def doWork(body, args, channel, ack_queue): 
        time.sleep(random.random()) 
        ack_queue.put(args.delivery_tag) 
    
    def doAck(channel): 
        while True: 
         try: 
          r = ack_queue.get_nowait() 
         except Empty: 
          r = None 
         if r is None: 
          break 
         try: 
          channel.basic_ack(delivery_tag=r) 
         except: 
          traceback.print_exc() 
    
    auth = credentials.PlainCredentials(username="guest", password="guest") 
    params = connection.ConnectionParameters(host="localhost", credentials=auth) 
    conn = BlockingConnection(params) 
    channel = conn.channel() 
    # Create a queue for the messages that should be ACKed by main thread 
    ack_queue = Queue() 
    
    while True: 
        time.sleep(0.03)  
        try: 
         doAck(channel) 
         method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
         if method_frame.NAME == 'Basic.GetEmpty': 
          continue   
         t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue]) 
         t.setDaemon(True) 
         t.start() 
        except Exception, e: 
         traceback.print_exc() 
         continue 
    
4

Per me, era solo che ho detto la coda che non avevo intenzione di ack, poi ho Acked.

E.g. SBAGLIATO:

channel.basic_consume(callback, queue=queue_name, no_ack=True) 

e poi nella mia callback:

def callback(ch, method, properties, body): 
    # do stuff 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

DESTRA:

channel.basic_consume(callback, queue=queue_name, no_ack=False) 

Linea di fondo: Se si desidera manualmente ACK, impostare no_ack = falso.

Dalla documentazione:

no_ack: (bool) se impostato su true, modalità automatica di riconoscimento saranno utilizzati (vedi http://www.rabbitmq.com/confirms.html)

+0

Grazie. Questo è stato davvero utile. Il problema che vedo è che il nome del parametro (no_ack o noAck in .net) è un po 'confuso. Mi sento come dovrebbe essere chiamato "ack", e se si passa true, riconoscerà il messaggio. –

Problemi correlati