2011-12-14 19 views
33

Sto utilizzando Celery per gestire attività asincrone. Occasionalmente, tuttavia, il processo di sedimentazione si interrompe e non viene eseguito nessuno dei compiti. Vorrei poter controllare lo stato del sedano e accertarmi che tutto funzioni correttamente e, se rilevo problemi, viene visualizzato un messaggio di errore per l'utente. Dalla documentazione di Celery Worker sembra che potrei essere in grado di usare ping o inspect per questo, ma ping sembra hacky e non è chiaro esattamente come deve essere usato inspect (se inspect(). Registered() è vuoto?).Rileva se Celery è disponibile/in esecuzione

Qualsiasi consiglio su questo sarebbe apprezzato. Fondamentalmente quello che sto cercando è un metodo in questo modo:

def celery_is_alive(): 
    from celery.task.control import inspect 
    return bool(inspect().registered()) # is this right?? 

EDIT: Non sembra nemmeno registrati() è disponibile sul sedano 2.3.3 (anche se la lista delle 2.1 documentazione di esso). Forse ping è la risposta giusta.

EDIT: Ping non sembra fare ciò che pensavo avrebbe fatto, quindi non sono ancora sicuro della risposta qui.

+0

ha fatto la risposta qui sotto non funziona per voi? Come qualcuno che ha un problema simile da risolvere, mi piacerebbe qualche conferma. – kojiro

risposta

44

Ecco il codice che ho utilizzato. celery.task.control.Inspect.stats() restituisce un dict contenente molti dettagli sui lavoratori attualmente disponibili, Nessuno se non ci sono lavoratori in esecuzione o solleva uno IOError se non riesce a connettersi al broker dei messaggi. Sto usando RabbitMQ - è possibile che altri sistemi di messaggistica si comportino in modo leggermente diverso. Questo ha funzionato in Celery 2.3.xe 2.4.x; Non sono sicuro di quanto lontano possa andare.

def get_celery_worker_status(): 
    ERROR_KEY = "ERROR" 
    try: 
     from celery.task.control import inspect 
     insp = inspect() 
     d = insp.stats() 
     if not d: 
      d = { ERROR_KEY: 'No running Celery workers were found.' } 
    except IOError as e: 
     from errno import errorcode 
     msg = "Error connecting to the backend: " + str(e) 
     if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED': 
      msg += ' Check that the RabbitMQ server is running.' 
     d = { ERROR_KEY: msg } 
    except ImportError as e: 
     d = { ERROR_KEY: str(e)} 
    return d 
+0

Ha funzionato per me :) – kojiro

+6

Ho scoperto che quanto sopra aggiunge due code reply.celery.pidbox a rabbitmq ogni volta che viene eseguito. Ciò porta ad un incremento incrementale nell'utilizzo della memoria di rabbitmq. – kojiro

2

Di seguito ha lavorato per me:

import socket 
from kombu import Connection 

celery_broker_url = "amqp://localhost" 

try: 
    conn = Connection(celery_broker_url) 
    conn.ensure_connection(max_retries=3) 
except socket.error: 
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url)) 
+2

Sono abbastanza sicuro che questo succederà se rabbitmq è in esecuzione indipendentemente dallo stato del sedano. Ma questo è un buon controllo da fare se il sedano non riesce a sapere se l'errore è con rabbitmq o qualcos'altro. –

Problemi correlati