2013-03-01 12 views
10

In base al tutorial di sedici su real-time monitoring of celery workers, è possibile anche acquisire a livello di codice gli eventi prodotti dagli operatori e agire di conseguenza.Come monitorare gli eventi da lavoratori in un'applicazione Celery-Django?

La mia domanda è: come posso integrare un monitor come quello nell'esempio this in un'applicazione Celery-Django?

EDIT: L'esempio di codice nel tutorial si presenta come:

from celery import Celery 

def my_monitor(app): 
    state = app.events.State() 

    def announce_failed_tasks(event): 
     state.event(event) 
     task_id = event['uuid'] 

     print('TASK FAILED: %s[%s] %s' % (
      event['name'], task_id, state[task_id].info(),)) 
    with app.connection() as connection: 
     recv = app.events.Receiver(connection, handlers={ 
       'task-failed': announce_failed_tasks, 
       'worker-heartbeat': announce_dead_workers, 
     }) 
     recv.capture(limit=None, timeout=None, wakeup=True) 

if __name__ == '__main__': 
    celery = Celery(broker='amqp://[email protected]//') 
    my_monitor(celery) 

quindi voglio catturare evento task_failed inviato dal lavoratore, e per ottenere il suo task_id come gli spettacoli del tutorial, per ottenere il risultato questa attività dal risultato-backend che è stato configurato per la mia applicazione e lo elabora ulteriormente. Il mio problema è che non è ovvio per me come ottenere l'applicazione, poiché in un progetto di django-sedano non è trasparente per me l'istanziazione della libreria di Celery.

Sono inoltre aperto a qualsiasi altra idea su come elaborare i risultati quando un operatore ha terminato l'esecuzione di un'attività.

+0

penso che dovrete essere un po 'più specifico, quali eventi hanno bisogno di catturare? Hai qualche codice di esempio? – danodonovan

risposta

14

Ok, ho trovato un modo per farlo, anche se non sono sicuro che questa sia la soluzione, ma funziona per me. La funzione di monitoraggio si collega fondamentalmente direttamente al broker e ascolta i diversi tipi di eventi. Il mio codice è simile al seguente:

from celery.events import EventReceiver 
from kombu import Connection as BrokerConnection 

def my_monitor: 
    connection = BrokerConnection('amqp://guest:[email protected]:5672//') 

    def on_event(event): 
     print "EVENT HAPPENED: ", event 

    def on_task_failed(event): 
     exception = event['exception'] 
     print "TASK FAILED!", event, " EXCEPTION: ", exception 

    while True: 
     try: 
      with connection as conn: 
       recv = EventReceiver(conn, 
           handlers={'task-failed' : on_task_failed, 
              'task-succeeded' : on_event, 
              'task-sent' : on_event, 
              'task-received' : on_event, 
              'task-revoked' : on_event, 
              'task-started' : on_event, 
              # OR: '*' : on_event 
              }) 
      recv.capture(limit=None, timeout=None) 
    except (KeyboardInterrupt, SystemExit): 
     print "EXCEPTION KEYBOARD INTERRUPT" 
     sys.exit() 

Questo è tutto. E lo eseguo in un processo diverso rispetto alla normale applicazione, il che significa che creo un processo figlio della mia applicazione di celery che esegue solo questa funzione. HTH

+0

Ciao, grazie, la tua domanda è fondamentalmente ciò che sto cercando di fare ora. Dove stai mettendo questo codice nel tuo progetto Django? Potresti spiegare come creare il processo secondario della tua app di sedano? Al momento la mia app di sedano viene configuita in 'myproj/myproj/celery.py' (come per http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery -with-django) – fpghost

+1

Ciao! Non ho lavorato su questo a lungo, quindi le cose potrebbero essere cambiate in Celery stesso nelle ultime n release. Fondamentalmente stavo iniziando un processo daemon Python come: daemon_process = Process (target = results_processing.my_monitor) daemon_process.daemon = True daemon_process.start() in uno dei moduli che viene chiamato quando l'app inizia – Clara

+0

Stavo usando Django e ha avviato questo monitor attraverso questo. In Django <1.9 ero in grado di iniziare il monitoraggio nel file 'proj/proj/celery.py', solo da' my_monitor (app) 'dopo aver definito l'app di sedano. Ora in Django 1.9 che risulta in un 'AppRegistryNotReady' exc (penso che importare modelli in '__init __. Py' di app ora non sia permesso --- Dovrei notare che il mio monitor si basa su alcuni modelli). Ho finito per avviare il monitor nel metodo 'AppConfig.ready()' dell'app django di cui i miei modelli facevano affidamento sul monitor (questo garantisce che l'app abbia terminato la registrazione). HTH – fpghost

4

Guardatevi un paio di grattacapi

  1. È necessario impostare CELERY_SEND_EVENTS segnala come vero nella tua config sedano.
  2. È anche possibile impostare il monitor eventi in una nuova discussione dal proprio operatore.

Qui è la mia realizzazione:

class MonitorThread(object): 
    def __init__(self, celery_app, interval=1): 
     self.celery_app = celery_app 
     self.interval = interval 

     self.state = self.celery_app.events.State() 

     self.thread = threading.Thread(target=self.run, args=()) 
     self.thread.daemon = True 
     self.thread.start() 

    def catchall(self, event): 
     if event['type'] != 'worker-heartbeat': 
      self.state.event(event) 

     # logic here 

    def run(self): 
     while True: 
      try: 
       with self.celery_app.connection() as connection: 
        recv = self.celery_app.events.Receiver(connection, handlers={ 
         '*': self.catchall 
        }) 
        recv.capture(limit=None, timeout=None, wakeup=True) 

      except (KeyboardInterrupt, SystemExit): 
       raise 

      except Exception: 
       # unable to capture 
       pass 

      time.sleep(self.interval) 

if __name__ == '__main__': 
    app = get_celery_app() # returns app 
    MonitorThread(app) 
    app.start() 
Problemi correlati