2012-02-29 13 views
12

Sto utilizzando Pika per elaborare i dati da RabbitMQ. Come mi è sembrato di imbattersi in diversi tipi di problemi, ho deciso di scrivere una piccola applicazione di test per vedere come posso gestire i disconnessi.RabbitMQ, Pika e strategia di riconnessione

Ho scritto questo app di prova che non seguente:

  1. Connetti a Broker, riprovare fino a quando il successo
  2. Quando è collegato creare una coda.
  3. Consumare questa coda e mettere risultare in un pitone Queue.Queue (0) voce
  4. ottenere da Queue.Queue (0) e produrre di nuovo nella coda di broker.

Quello che ho notato sono stati 2 problemi:

  1. Quando eseguo il mio script da un host connessione a RabbitMQ su un altro host (all'interno di una VM), allora questo script esce in momenti casuali senza produrre un errore.
  2. Quando eseguo il mio script sullo stesso host su cui è installato RabbitMQ, gira bene e continua a funzionare.

Questo potrebbe essere spiegato a causa di problemi di rete, i pacchetti sono scesi anche se trovo la connessione non molto robusta.

Quando lo script viene eseguito localmente sul server RabbitMQ e mi uccidere il RabbitMQ allora lo script esce con l'errore: "ERRORE pika SelectConnection: Errore socket il 3: 104"

così sembra che non posso ottenere la strategia di riconnessione funzionante come dovrebbe essere. Qualcuno potrebbe dare un'occhiata al codice, quindi guarda cosa sto facendo male?

Grazie,

Jay

#!/bin/python 
import logging 
import threading 
import Queue 
import pika 
from pika.reconnection_strategies import SimpleReconnectionStrategy 
from pika.adapters import SelectConnection 
import time 
from threading import Lock 

class Broker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.logging = logging.getLogger(__name__) 
     self.to_broker = Queue.Queue(0) 
     self.from_broker = Queue.Queue(0) 
     self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True) 
     self.srs = SimpleReconnectionStrategy() 
     self.properties = pika.BasicProperties(delivery_mode=2) 

     self.connection = None 
     while True: 
      try: 
       self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs) 
       break 
      except Exception as err: 
       self.logging.warning('Cant connect. Reason: %s' % err) 
       time.sleep(1) 

     self.daemon=True 
    def run(self): 
     while True: 
      self.submitData(self.from_broker.get(block=True)) 
     pass 
    def on_connected(self,connection): 
     connection.channel(self.on_channel_open) 
    def on_channel_open(self,new_channel): 
     self.channel = new_channel 
     self.channel.queue_declare(queue='sandbox', durable=True) 
     self.channel.basic_consume(self.processData, queue='sandbox')  
    def processData(self, ch, method, properties, body): 
     self.logging.info('Received data from broker') 
     self.channel.basic_ack(delivery_tag=method.delivery_tag) 
     self.from_broker.put(body) 
    def submitData(self,data): 
     self.logging.info('Submitting data to broker.') 
     self.channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=self.properties) 
if __name__ == '__main__': 
    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 
    broker=Broker() 
    broker.start() 
    try: 
     broker.connection.ioloop.start() 
    except Exception as err: 
     print err 

risposta

17

Il problema principale con lo script è che si sta interagendo con un singolo canale sia dal thread principale (dove l'ioloop è in esecuzione) e il "Broker" thread (chiama submitData in un ciclo). Questo è not safe.

Inoltre, SimpleReconnectionStrategy non sembra fare nulla di utile. Non provoca una riconnessione se la connessione viene interrotta. Credo che questo sia un bug in Pika: https://github.com/pika/pika/issues/120

Ho tentato di refactoring il codice per farlo funzionare come penso che lo si desidera, ma si è imbattuto in un altro problema. Pika non sembra avere un modo per rilevare l'errore di consegna, il che significa che i dati potrebbero andare persi se la connessione si interrompe. Questo sembra un requisito così ovvio! Come può non esserci modo di rilevare che basic_publish non è riuscito? Ho provato tutti i tipi di cose, comprese le transazioni e add_on_return_callback (tutto ciò sembrava goffo e eccessivamente complicato), ma non è venuto fuori con nulla. Se davvero non c'è modo, Pika sembra essere utile solo in situazioni che possono tollerare la perdita di dati inviati a RabbitMQ o nei programmi che devono essere consumati da RabbitMQ.

Questo non è affidabile, ma per riferimento, ecco qualche codice che risolve il problema multi-thread:

import logging 
import pika 
import Queue 
import sys 
import threading 
import time 
from functools import partial 
from pika.adapters import SelectConnection, BlockingConnection 
from pika.exceptions import AMQPConnectionError 
from pika.reconnection_strategies import SimpleReconnectionStrategy 

log = logging.getLogger(__name__) 

DEFAULT_PROPERTIES = pika.BasicProperties(delivery_mode=2) 


class Broker(object): 

    def __init__(self, parameters, on_channel_open, name='broker'): 
     self.parameters = parameters 
     self.on_channel_open = on_channel_open 
     self.name = name 

    def connect(self, forever=False): 
     name = self.name 
     while True: 
      try: 
       connection = SelectConnection(
        self.parameters, self.on_connected) 
       log.debug('%s connected', name) 
      except Exception: 
       if not forever: 
        raise 
       log.warning('%s cannot connect', name, exc_info=True) 
       time.sleep(10) 
       continue 

      try: 
       connection.ioloop.start() 
      finally: 
       try: 
        connection.close() 
        connection.ioloop.start() # allow connection to close 
       except Exception: 
        pass 

      if not forever: 
       break 

    def on_connected(self, connection): 
     connection.channel(self.on_channel_open) 


def setup_submitter(channel, data_queue, properties=DEFAULT_PROPERTIES): 
    def on_queue_declared(frame): 
     # PROBLEM pika does not appear to have a way to detect delivery 
     # failure, which means that data could be lost if the connection 
     # drops... 
     channel.confirm_delivery(on_delivered) 
     submit_data() 

    def on_delivered(frame): 
     if frame.method.NAME in ['Confirm.SelectOk', 'Basic.Ack']: 
      log.info('submission confirmed %r', frame) 
      # increasing this value seems to cause a higher failure rate 
      time.sleep(0) 
      submit_data() 
     else: 
      log.warn('submission failed: %r', frame) 
      #data_queue.put(...) 

    def submit_data(): 
     log.info('waiting on data queue') 
     data = data_queue.get() 
     log.info('got data to submit') 
     channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=properties, 
        mandatory=True) 
     log.info('submitted data to broker') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


def blocking_submitter(parameters, data_queue, 
     properties=DEFAULT_PROPERTIES): 
    while True: 
     try: 
      connection = BlockingConnection(parameters) 
      channel = connection.channel() 
      channel.queue_declare(queue='sandbox', durable=True) 
     except Exception: 
      log.error('connection failure', exc_info=True) 
      time.sleep(1) 
      continue 
     while True: 
      log.info('waiting on data queue') 
      try: 
       data = data_queue.get(timeout=1) 
      except Queue.Empty: 
       try: 
        connection.process_data_events() 
       except AMQPConnectionError: 
        break 
       continue 
      log.info('got data to submit') 
      try: 
       channel.basic_publish(exchange='', 
          routing_key='sandbox', 
          body=data, 
          properties=properties, 
          mandatory=True) 
      except Exception: 
       log.error('submission failed', exc_info=True) 
       data_queue.put(data) 
       break 
      log.info('submitted data to broker') 


def setup_receiver(channel, data_queue): 
    def process_data(channel, method, properties, body): 
     log.info('received data from broker') 
     data_queue.put(body) 
     channel.basic_ack(delivery_tag=method.delivery_tag) 

    def on_queue_declared(frame): 
     channel.basic_consume(process_data, queue='sandbox') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


if __name__ == '__main__': 
    if len(sys.argv) != 2: 
     print 'usage: %s RABBITMQ_HOST' % sys.argv[0] 
     sys.exit() 

    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 

    host = sys.argv[1] 
    log.info('connecting to host: %s', host) 
    parameters = pika.ConnectionParameters(host=host, heartbeat=True) 
    data_queue = Queue.Queue(0) 
    data_queue.put('message') # prime the pump 

    # run submitter in a thread 

    setup = partial(setup_submitter, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'submitter') 
    thread = threading.Thread(target= 
     partial(broker.connect, forever=True)) 

    # uncomment these lines to use the blocking variant of the submitter 
    #thread = threading.Thread(target= 
    # partial(blocking_submitter, parameters, data_queue)) 

    thread.daemon = True 
    thread.start() 

    # run receiver in main thread 
    setup = partial(setup_receiver, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'receiver') 
    broker.connect(forever=True) 
+0

Grazie per aver dedicato del tempo passando attraverso il codice e trovare tutte le problematiche ad essa collegate. Attualmente sto usando http://barryp.org/software/py-amqplib/ che è una libreria più semplice/più semplice ma che soddisfa completamente le mie esigenze. In combinazione con gevent, ho dei risultati davvero interessanti. Di questi tempi non mi annoio più con Pika. –

+1

puoi usare Channel.confirm_delivery() per attendere ack dopo la pubblicazione, una volta chiusa la connessione, verrà timeout, quindi saprai che il messaggio non viene consegnato al broker –

Problemi correlati