2011-10-27 21 views
15

Come eliminare tutte le attività pianificate e in esecuzione di una specifica coda con sedano in python? Le domande sembra abbastanza straigtforward, ma per aggiungere io non sto cercando il codice di riga di comandoCome eliminare tutte le attività di una coda specifica con il sedano in python?

Ho la seguente riga, che definisce il que e vorrebbe eliminare che que per gestire le attività:

CELERY_ROUTES = {"socialreport.tasks.twitter_save": {"queue": "twitter_save"}} 

A 1 punto nel tempo voglio eliminare tutte le attività in que twitter_save con codice python, magari con una funzione broadcast? Non sono riuscito a trovare la documentazione su questo. È possibile?

risposta

33

solo per aggiornare @ Sam Stoelinga risposta per il sedano 3.1, ora si può fare in questo modo su un terminale:

celery amqp queue.purge <QUEUE_NAME> 

Per Django essere sicuri di iniziare dal file manage.py:

./manage.py celery amqp queue.purge <QUEUE_NAME> 

In caso contrario, assicurarsi che il sedano è in grado di puntare correttamente al broker impostando il flag --broker=.

+0

Grazie per questo lavoro per Celery 3.1 – Gourneau

+0

Avevo bisogno di utilizzare l'argomento '--broker = ...' per puntare a un URL AMQP valido, poiché per qualche motivo il valore configurato in Django settings.py non è stato raccolto . Forse una particolarità del mio setup. – RichVel

+0

@RichVel lo stai eseguendo da./Manage.py'? l'argomento url del broker dovrebbe essere preso dal file 'settings.py'. – Hassek

6

Lol è abbastanza facile, spero che qualcuno possa ancora aiutarmi.

from celery.bin.camqadm import camqadm 
camqadm('queue.purge', queue_name_as_string) 

L'unico problema con questo ho ancora bisogno di fermare il celeryd prima di spurgo que, dopo lo spurgo ho bisogno di eseguire il celeryd nuovo per gestire le attività per la coda. Aggiornerà questa domanda se avrò successo.

Sono riuscito, ma per favore correggimi se questo non è un buon metodo per fermare il sedano, eliminare l'errore e riavviarlo. So che sto usando termine, perché in realtà voglio che sia terminato il compito.

kill_command = "ps auxww | grep 'celeryd -n twitter_save' | awk '{print $2}' | xargs kill -9" 
subprocess.call(kill_command, shell=True) 

camqadm('queue.purge', 'twitter_save') 
rerun_command = "/home/samos/Software/virt_env/twittersyncv1/bin/python %s/manage.py celeryd -n twitter_save -l info -Q twitter_save" % settings.PROJECT_ROOT 

os.popen(rerun_command+' &') 
send_task("socialreport.tasks.twitter_save") 
6

La risposta originale non funziona per Celery 3.1. L'aggiornamento di Hassek è il comando corretto se vuoi farlo dalla riga di comando. Ma se si vuole farlo programmazione, fare questo:

si Supponendo che correva il tuo Sedano applicazione come:

celery_app = Celery(...) 

Poi:

import celery.bin.amqp 
amqp = celery.bin.amqp.amqp(app = celery_app) 
amqp.run('queue.purge', 'name_of_your_queue') 

Questo è utile per i casi in cui si' Abbiamo accodato un sacco di compiti, e un compito incontra una condizione fatale che sai impedirà il resto delle attività da eseguire.

E.g. hai accodato un sacco di attività di crawler web e nel mezzo delle tue attività l'indirizzo IP del tuo server viene bloccato. Non ha senso eseguire il resto delle attività. In tal caso, il tuo compito può eliminare la sua coda.

Problemi correlati