2012-05-22 19 views
6

Utilizzo Django e Celery e sto provando a configurare il routing su più code. Quando si specifica l'attività routing_key e exchange (nel decoratore di attività o utilizzando apply_async()), l'attività non viene aggiunta al broker (che è Kombu che si connette al mio database MySQL).Django & Celery - Problemi di routing

Se si specifica il nome della coda nel decoratore di attività (che significa che la chiave di instradamento è ignorata), l'attività funziona correttamente. Sembra essere un problema con l'impostazione di routing/scambio.

Qualche idea di quale potrebbe essere il problema?

Ecco il programma di installazione:

settings.py

INSTALLED_APPS = (
    ... 
    'kombu.transport.django', 
    'djcelery', 
) 
BROKER_BACKEND = 'django' 
CELERY_DEFAULT_QUEUE = 'default' 
CELERY_DEFAULT_EXCHANGE = "tasks" 
CELERY_DEFAULT_EXCHANGE_TYPE = "topic" 
CELERY_DEFAULT_ROUTING_KEY = "task.default" 
CELERY_QUEUES = { 
    'default': { 
     'binding_key':'task.#', 
    }, 
    'i_tasks': { 
     'binding_key':'important_task.#', 
    }, 
} 

tasks.py

from celery.task import task 

@task(routing_key='important_task.update') 
def my_important_task(): 
    try: 
     ... 
    except Exception as exc: 
     my_important_task.retry(exc=exc) 

Iniziato compito:

from tasks import my_important_task 
my_important_task.delay() 
+0

Come passate routing_key ? Con async_apply? – mher

+0

Sto usando il metodo 'delay()', che è solo una scorciatoia per 'apply_async()'. Sto cercando di mantenere le specifiche 'routing_key' con il metodo task (tramite il decoratore) invece di quando viene chiamato. Ho provato a passare la chiave usando 'apply_async()', ma ho lo stesso problema. Il ritardo –

+0

non accetta la parola chiave routing_key. È una versione semplificata di apply_async ma non sono la stessa cosa. – mher

risposta

43

Si utilizza il Django ORM come un mediatore, il che significa che le dichiarazioni vengono memorizzate solo nella memoria (vedi, indiscutibilmente difficile da trovare, tabella di comparazione di trasporto a http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison)

Quindi, quando si applica questo compito con routing_key important_task.update non sarà in grado di instradarlo, perché non ha ancora dichiarato la coda.

Esso funziona se si esegue questa operazione:

@task(queue="i_tasks", routing_key="important_tasks.update") 
def important_task(): 
    print("IMPORTANT") 

Ma sarebbe molto più semplice per utilizzare la funzionalità di routing automatico, dal momento che qui non c'è niente che vi mostra necessario utilizzare un ' scambio soggetto', utilizzare routing automatico semplicemente rimuovere le impostazioni:

  • CELERY_DEFAULT_QUEUE,
  • CELERY_DEFAULT_EXCHANGE,
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

E dichiarare il vostro compito in questo modo:

@task(queue="important") 
def important_task(): 
    return "IMPORTANT" 

e poi per avviare un lavoratore che consumano da quella coda:

$ python manage.py celeryd -l info -Q important 

o di consumare sia dal (celery) coda di default e la coda important:

$ python manage.py celeryd -l info -Q celery,important 

Un'altra buona pratica è quella di non hardcode i nomi delle code nel compito e utilizzare CELERY_ROUTES invece :

@task 
def important_task(): 
    return "DEFAULT" 

quindi nelle impostazioni:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}} 

Se ancora insistete sul usando scambi argomento allora si potrebbe aggiungere questo router di dichiarare automaticamente tutte le code per la prima volta un task viene inviato:

class PredeclareRouter(object): 
    setup = False 

    def route_for_task(self, *args, **kwargs): 
     if self.setup: 
      return 
     self.setup = True 
     from celery import current_app, VERSION as celery_version 
     # will not connect anywhere when using the Django transport 
     # because declarations happen in memory. 
     with current_app.broker_connection() as conn: 
      queues = current_app.amqp.queues 
      channel = conn.default_channel 
      if celery_version >= (2, 6): 
       for queue in queues.itervalues(): 
        queue(channel).declare() 
      else: 
       from kombu.common import entry_to_queue 
       for name, opts in queues.iteritems(): 
        entry_to_queue(name, **opts)(channel).declare() 
CELERY_ROUTES = (PredeclareRouter(),) 
+0

Grazie per la spiegazione! –

+2

Questo problema riguarda le dichiarazioni e gli scambi delle code risolti in Celery 3? Sto usando il nuovo 'CELERY_QUEUES = (Queue (...), ...)' nelle impostazioni, vuol dire che le code vengono dichiarate correttamente? –

+0

Nota: in Celery 4.0 in poi, CELERY_ROUTES è stato sostituito con CELERY_TASK_ROUTES. Potrebbe salvare il tempo di qualcuno. –

Problemi correlati