2012-10-23 7 views
9

Vorrei verificare se un consumatore/lavoratore è presente per consumare un messaggio che sto per inviare.In Pika o RabbitMQ, come posso controllare se alcuni consumatori stanno consumando attualmente?

Se non c'è alcun Worker, vorrei iniziare alcuni lavoratori (sia i consumatori che gli editori sono su una singola macchina) e poi andare sulla pubblicazione Messaggi.

Se c'è una funzione come connection.check_if_has_consumers, vorrei implementare è un po 'come questo -

import pika 
import workers 

# code for publishing to worker queue 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

# if there are no consumers running (would be nice to have such a function) 
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""): 
    # start the workers in other processes, using python's `multiprocessing` 
    workers.start_workers() 

# now, publish with no fear of your queues getting filled up 
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True) 
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin", 
          properties=pika.BasicProperties(delivery_mode=2)) 
connection.close() 

Ma io sono in grado di trovare qualsiasi funzione con check_if_has_consumers funzionalità in pika.

C'è un modo per realizzare questo, utilizzando pika? o forse, per parlando a The Rabbit direttamente?

io non sono del tutto sicuro, ma penso davvero RabbitMQ sarebbe a conoscenza del numero di consumatori sottoscritto diverse code, dal momento che fa inviare messaggi a loro e accetta acks

ho appena è cominciato con RabbitMQ 3 ore fa ... ogni aiuto è benvenuto ...

ecco il codice workers.py ho scritto, se il suo aiuto ....

import multiprocessing 
import pika 


def start_workers(num=3): 
    """start workers as non-daemon processes""" 
    for i in xrange(num):  
     process = WorkerProcess() 
     process.start() 


class WorkerProcess(multiprocessing.Process): 
    """ 
    worker process that waits infinitly for task msgs and calls 
    the `callback` whenever it gets a msg 
    """ 
    def __init__(self): 
     multiprocessing.Process.__init__(self) 
     self.stop_working = multiprocessing.Event() 

    def run(self): 
     """ 
     worker method, open a channel through a pika connection and 
     start consuming 
     """ 
     connection = pika.BlockingConnection(
           pika.ConnectionParameters(host='localhost') 
        ) 
     channel = connection.channel() 
     channel.queue_declare(queue='worker_queue', auto_delete=False, 
                durable=True) 

     # don't give work to one worker guy until he's finished 
     channel.basic_qos(prefetch_count=1) 
     channel.basic_consume(callback, queue='worker_queue') 

     # do what `channel.start_consuming()` does but with stopping signal 
     while len(channel._consumers) and not self.stop_working.is_set(): 
      channel.transport.connection.process_data_events() 

     channel.stop_consuming() 
     connection.close() 
     return 0 

    def signal_exit(self): 
     """exit when finished with current loop""" 
     self.stop_working.set() 

    def exit(self): 
     """exit worker, blocks until worker is finished and dead""" 
     self.signal_exit() 
     while self.is_alive(): # checking `is_alive()` on zombies kills them 
      time.sleep(1) 

    def kill(self): 
     """kill now! should not use this, might create problems""" 
     self.terminate() 
     self.join() 


def callback(channel, method, properties, body): 
    """pika basic consume callback""" 
    print 'GOT:', body 
    # do some heavy lifting here 
    result = save_to_database(body) 
    print 'DONE:', result 
    channel.basic_ack(delivery_tag=method.delivery_tag) 

EDIT:

devo andare avanti: ecco una soluzione che mi accingo a fare, a meno che un approccio migliore arriva,

Quindi, RabbitMQ ha queste HTTP management apis, funzionano dopo aver acceso il management plugin e al centro della pagina API HTTP c'è

/api/connessioni - Un elenco di tutte le connessioni aperte.

/api/connessioni/nome: una connessione singola. ELIMINANDO chiuderà la connessione.

Quindi, se collego il mio lavoratori e la mia produce sia da diversi Connection nomi/utenti, sarò in grado di verificare se il collegamento Worker è aperto ... (c'è potrebbero essere problemi quando il lavoratore muore ...)

sarà in attesa di una soluzione migliore ...

EDIT:

appena trovato questo nella documentazione RabbitMQ, ma questo sarebbe hacky fare in python:

[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers 
Listing queues ... 
worker_queue 0 
...done. 

così ho potuto fare qualcosa di simile,

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'") 

hacky ... spero che pika abbia qualche funzione python per farlo ...

Grazie,

risposta

7

Stavo anche esaminando questo. Dopo la lettura attraverso la sorgente e documentazione mi sono imbattuto nel seguente in channel.py:

@property 
def consumer_tags(self): 
    """Property method that returns a list of currently active consumers 

    :rtype: list 

    """ 
    return self._consumers.keys() 

mio test ha avuto successo. Ho usato il seguente in cui il mio oggetto canale è self._channel:

if len(self._channel.consumer_tags) == 0: 
     LOGGER.info("Nobody is listening. I'll come back in a couple of minutes.") 
     ... 
0

In realtà ho trovato questo in caso di incidente alla ricerca di un problema diverso, ma una cosa che può aiutarti è nella funzione Basic_Publish, c'è un parametro "Immediato" che è impostato su False.

Un'idea che si possa fare è quello di impostare il flag immediata su True, che richiederà di essere consumato da un consumatore immediatamente, invece di sedersi in una coda. Se un lavoratore non è disponibile a consumare il messaggio, restituirà un errore, che ti dice di avviare un altro lavoratore.

A seconda del throughput del sistema, questo potrebbe generare molti lavoratori in più o generare lavoratori per sostituire i lavoratori morti. Per il precedente problema è possibile scrivere un sistema simile ad un amministratore che semplicemente tiene traccia dei lavoratori tramite una coda di controllo, dove è possibile indicare un processo simile a un "corridore" per uccidere i processi dei lavoratori che ora non sono più necessari.

Problemi correlati