Come posso recuperare un elenco di attività in una coda che devono ancora essere elaborate?Recupera un elenco di attività in una coda in Celery
risposta
Penso che l'unico modo per ottenere le attività in attesa sia mantenere un elenco di attività avviate e lasciare che l'attività si rimuova dall'elenco quando viene avviato.
Con rabbitmqctl e list_queues è possibile ottenere una panoramica di come molte attività sono in attesa, ma non i compiti stessa: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Se quello che vuoi include il compito in fase di elaborazione, ma non sono ancora finito, è possibile mantenere un elenco di voi compiti e controllare il loro stato:
from tasks import add
result = add.delay(4, 4)
result.ready() # True if finished
Oppure si lascia negozio di sedano i risultati con CELERY_RESULT_BACKEND e verificare quali dei tuoi compiti non sono in là.
MODIFICA: vedere altre risposte per ottenere un elenco di attività in coda.
Si dovrebbe guardare qui: Celery Guide - Inspecting Workers
Fondamentalmente questo:
>>> from celery.task.control import inspect
# Inspect all nodes.
>>> i = inspect()
# Show the items that have an ETA or are scheduled for later processing
>>> i.scheduled()
# Show tasks that are currently active.
>>> i.active()
# Show tasks that have been claimed by workers
>>> i.reserved()
A seconda di ciò che si vuole
L'ho provato, ma è davvero lento (come 1 secondo). Lo sto usando in modo sincrono in un'app tornado per monitorare i progressi, quindi deve essere veloce. – JulienFr
Ciò non restituirà un elenco di attività nella coda che devono ancora essere elaborate. –
Utilizzare 'i.reserved()' per ottenere un elenco di attività in coda. – Banana
Per recuperare compiti da backend, utilizzare questo
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
ma 'jobs' dà solo il numero di attività in coda – bitnik
se stai usando il rabbino tMQ, utilizzare questo terminale:
sudo rabbitmqctl list_queues
stampa l'elenco delle code con il numero di attività in sospeso. ad esempio:
Listing queues ...
0b27d8c59fba4974893ec22d478a7093 0
0e0a2da9828a48bc86fe993b210d984f 0
[email protected] 0
11926b79e30a4f0a9d95df61b6f402f7 0
15c036ad25884b82839495fb29bd6395 1
[email protected] 0
celery 166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0
il numero nella colonna di destra è il numero di attività in coda. in precedenza, la coda di sedano ha 166 attività in sospeso.
Ho familiarità con questo quando ho privilegi sudo, ma voglio un utente di sistema non privilegiato per essere in grado di controllare - qualche suggerimento? – sage
Inoltre è possibile reindirizzare questo valore a 'grep -e"^celery \ s "| cut -f2' per estrarre quel '166' se vuoi elaborare quel numero più tardi, ad esempio per le statistiche. – jamesc
Il modulo di ispezione di sedano sembra essere solo a conoscenza delle attività dal punto di vista dei lavoratori. Se si desidera visualizzare i messaggi che si trovano in coda (non ancora estratti dai lavoratori), suggerisco di utilizzare pyrabbit, che può interfacciare con il coniglio aq http api per recuperare tutti i tipi di informazioni dalla coda.
Un esempio può essere trovato qui: Retrieve queue length with Celery (RabbitMQ, Django)
Se non si utilizzano i compiti prioritari, questo è in realtà pretty simple se si sta utilizzando Redis. Per ottenere i conteggi attività:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Ma, compiti prioritari use a different key in redis, in modo che il quadro completo è leggermente più complicato. L'immagine completa è che è necessario interrogare i redis per ogni priorità dell'attività.In Python (e dal progetto Fiore), questo appare come:
PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]
def make_queue_name_for_pri(queue, pri):
"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""
if pri not in DEFAULT_PRIORITY_STEPS:
raise ValueError('Priority not in priority steps')
return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
(queue, '', '')))
def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return sum([r.llen(x) for x in priority_names])
Se si desidera ottenere un compito vero e proprio, si può usare qualcosa come:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1
Da lì dovrete deserializzare la lista restituita. Nel mio caso sono stato in grado di realizzare questo con qualcosa di simile:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Basta essere avvertito che deserializzazione può prendere un attimo, e avrete bisogno di regolare i comandi sopra per lavorare con diverse priorità.
Sono giunto alla conclusione il modo migliore per ottenere il numero di lavori in coda è utilizzare rabbitmqctl
come è stato suggerito più volte qui. Per consentire a qualsiasi utente scelto per eseguire il comando con sudo
Ho seguito le istruzioni here (I ha risparmiato la modifica della parte profilo come non mi importa digitando sudo prima del comando.)
Ho anche preso grep
e cut
frammento di jamesc e lo avvolse in chiamate di sottoprocesso.
from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
- 1. Coda temporanea in Celery
- 2. Convertire una coda in elenco
- 3. Attività di rimbalzo di Celery?
- 4. Attività di profilatura di Celery
- 5. Celery: come limitare il numero di attività in coda e interrompere l'alimentazione quando è piena?
- 6. Celery - Come inviare attività dalla macchina remota?
- 7. Recupera il risultato da 'task_id' in Celery da un'attività sconosciuta
- 8. Questa app Web richiede una coda di attività?
- 9. In Java 8, Executors.newWorkStealingPool() fornisce anche una coda di attività?
- 10. Celery/RabbitMQ messaggi non bloccati che bloccano la coda?
- 11. Celery: esecuzione di una serie di attività con dipendenze complesse
- 12. Eliminazione della catena di attività di Celery?
- 13. La sequenza di battiti di Celery include attività obsolete
- 14. Recupera un elenco di elementi correlati di un elenco di elementi in un array di basi
- 15. Celery: concatenamento di attività con più argomenti
- 16. Attività API Google Spostare attività in un elenco diverso
- 17. Elenco attività javascript in Visual Studio Elenco attività
- 18. Celery come eventi pub/sub in rete
- 19. Come riavviare Celery senza ritardi delle attività
- 20. Celery: come posso indirizzare un'attività fallita a una coda di messaggi non recapitati
- 21. Annullare un'attività già in esecuzione con Celery?
- 22. Django Celery ottiene il conteggio delle attività
- 23. Interrompere un'attività in esecuzione in Celery all'interno di django
- 24. Creazione e instradamento dinamico della coda di Celery
- 25. Come posso rinviare l'esecuzione delle attività di Celery?
- 26. Implementazione di code di priorità "inattiva" e "normale" per attività a esecuzione prolungata in Celery
- 27. Coda eventi/attività Multithreading C++
- 28. Come garantire che un'attività di Celery preveda la sovrapposizione delle esecuzioni delle attività di Celery
- 29. Cosa succede alle attività pianificate (eta) di un Celery Worker quando si spegne?
- 30. Perché utilizzare Celery anziché RabbitMQ?
Quale back-end stai usando? – Bacon
RabbitMQ, ma voglio recuperare questa lista all'interno di Python. –