2011-04-04 11 views

risposta

2

Penso che l'unico modo per ottenere le attività in attesa sia mantenere un elenco di attività avviate e lasciare che l'attività si rimuova dall'elenco quando viene avviato.

Con rabbitmqctl e list_queues è possibile ottenere una panoramica di come molte attività sono in attesa, ma non i compiti stessa: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

Se quello che vuoi include il compito in fase di elaborazione, ma non sono ancora finito, è possibile mantenere un elenco di voi compiti e controllare il loro stato:

from tasks import add 
result = add.delay(4, 4) 

result.ready() # True if finished 

Oppure si lascia negozio di sedano i risultati con CELERY_RESULT_BACKEND e verificare quali dei tuoi compiti non sono in là.

125

MODIFICA: vedere altre risposte per ottenere un elenco di attività in coda.

Si dovrebbe guardare qui: Celery Guide - Inspecting Workers

Fondamentalmente questo:

>>> from celery.task.control import inspect 

# Inspect all nodes. 
>>> i = inspect() 

# Show the items that have an ETA or are scheduled for later processing 
>>> i.scheduled() 

# Show tasks that are currently active. 
>>> i.active() 

# Show tasks that have been claimed by workers 
>>> i.reserved() 

A seconda di ciò che si vuole

+6

L'ho provato, ma è davvero lento (come 1 secondo). Lo sto usando in modo sincrono in un'app tornado per monitorare i progressi, quindi deve essere veloce. – JulienFr

+17

Ciò non restituirà un elenco di attività nella coda che devono ancora essere elaborate. –

+7

Utilizzare 'i.reserved()' per ottenere un elenco di attività in coda. – Banana

9

Per recuperare compiti da backend, utilizzare questo

from amqplib import client_0_8 as amqp 
conn = amqp.Connection(host="localhost:5672 ", userid="guest", 
         password="guest", virtual_host="/", insist=False) 
chan = conn.channel() 
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True) 
+1

ma 'jobs' dà solo il numero di attività in coda – bitnik

28

se stai usando il rabbino tMQ, utilizzare questo terminale:

sudo rabbitmqctl list_queues 

stampa l'elenco delle code con il numero di attività in sospeso. ad esempio:

Listing queues ... 
0b27d8c59fba4974893ec22d478a7093 0 
0e0a2da9828a48bc86fe993b210d984f 0 
[email protected] 0 
11926b79e30a4f0a9d95df61b6f402f7 0 
15c036ad25884b82839495fb29bd6395 1 
[email protected] 0 
celery 166 
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0 
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0 

il numero nella colonna di destra è il numero di attività in coda. in precedenza, la coda di sedano ha 166 attività in sospeso.

+0

Ho familiarità con questo quando ho privilegi sudo, ma voglio un utente di sistema non privilegiato per essere in grado di controllare - qualche suggerimento? – sage

+0

Inoltre è possibile reindirizzare questo valore a 'grep -e"^celery \ s "| cut -f2' per estrarre quel '166' se vuoi elaborare quel numero più tardi, ad esempio per le statistiche. – jamesc

3

Il modulo di ispezione di sedano sembra essere solo a conoscenza delle attività dal punto di vista dei lavoratori. Se si desidera visualizzare i messaggi che si trovano in coda (non ancora estratti dai lavoratori), suggerisco di utilizzare pyrabbit, che può interfacciare con il coniglio aq http api per recuperare tutti i tipi di informazioni dalla coda.

Un esempio può essere trovato qui: Retrieve queue length with Celery (RabbitMQ, Django)

7

Se non si utilizzano i compiti prioritari, questo è in realtà pretty simple se si sta utilizzando Redis. Per ottenere i conteggi attività:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME 

Ma, compiti prioritari use a different key in redis, in modo che il quadro completo è leggermente più complicato. L'immagine completa è che è necessario interrogare i redis per ogni priorità dell'attività.In Python (e dal progetto Fiore), questo appare come:

PRIORITY_SEP = '\x06\x16' 
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9] 


def make_queue_name_for_pri(queue, pri): 
    """Make a queue name for redis 

    Celery uses PRIORITY_SEP to separate different priorities of tasks into 
    different queues in Redis. Each queue-priority combination becomes a key in 
    redis with names like: 

    - batch1\x06\x163 <-- P3 queue named batch1 

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon: 

     - https://github.com/celery/kombu/issues/422 

    In that ticket the code below, from the Flower project, is referenced: 

     - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135 

    :param queue: The name of the queue to make a name for. 
    :param pri: The priority to make a name with. 
    :return: A name for the queue-priority pair. 
    """ 
    if pri not in DEFAULT_PRIORITY_STEPS: 
     raise ValueError('Priority not in priority steps') 
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else 
           (queue, '', ''))) 


def get_queue_length(queue_name='celery'): 
    """Get the number of tasks in a celery queue. 

    :param queue_name: The name of the queue you want to inspect. 
    :return: the number of items in the queue. 
    """ 
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in 
         DEFAULT_PRIORITY_STEPS] 
    r = redis.StrictRedis(
     host=settings.REDIS_HOST, 
     port=settings.REDIS_PORT, 
     db=settings.REDIS_DATABASES['CELERY'], 
    ) 
    return sum([r.llen(x) for x in priority_names]) 

Se si desidera ottenere un compito vero e proprio, si può usare qualcosa come:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1 

Da lì dovrete deserializzare la lista restituita. Nel mio caso sono stato in grado di realizzare questo con qualcosa di simile:

r = redis.StrictRedis(
    host=settings.REDIS_HOST, 
    port=settings.REDIS_PORT, 
    db=settings.REDIS_DATABASES['CELERY'], 
) 
l = r.lrange('celery', 0, -1) 
pickle.loads(base64.decodestring(json.loads(l[0])['body'])) 

Basta essere avvertito che deserializzazione può prendere un attimo, e avrete bisogno di regolare i comandi sopra per lavorare con diverse priorità.

+0

Dopo averlo utilizzato in produzione, ho appreso che [non funziona se si utilizzano attività prioritarie] (https://github.com/celery/kombu/issues/422), a causa della progettazione di Celery. – mlissner

+0

Ho aggiornato quanto sopra per gestire le attività prioritarie. Progresso! – mlissner

1

Sono giunto alla conclusione il modo migliore per ottenere il numero di lavori in coda è utilizzare rabbitmqctl come è stato suggerito più volte qui. Per consentire a qualsiasi utente scelto per eseguire il comando con sudo Ho seguito le istruzioni here (I ha risparmiato la modifica della parte profilo come non mi importa digitando sudo prima del comando.)

Ho anche preso grep e cut frammento di jamesc e lo avvolse in chiamate di sottoprocesso.

from subprocess import Popen, PIPE 
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE) 
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE) 
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE) 
p1.stdout.close() 
p2.stdout.close() 
print("number of jobs on queue: %i" % int(p3.communicate()[0])) 
Problemi correlati