2012-07-16 20 views
13

Ho una catena di sedici che esegue alcune attività. Ognuno dei compiti può fallire ed essere riprovato. Si prega di vedere sotto per un esempio veloce:Riprovare le attività fallite di sedano che fanno parte di una catena

from celery import task 

@task(ignore_result=True) 
def add(x, y, fail=True): 
    try: 
     if fail: 
      raise Exception('Ugly exception.') 
     print '%d + %d = %d' % (x, y, x+y) 
    except Exception as e: 
     raise add.retry(args=(x, y, False), exc=e, countdown=10) 

@task(ignore_result=True) 
def mul(x, y): 
    print '%d * %d = %d' % (x, y, x*y) 

e la catena:

from celery.canvas import chain 
chain(add.si(1, 2), mul.si(3, 4)).apply_async() 

Esecuzione dei due compiti (e supponendo che nulla riesce), il vostro otterrebbe/si veda stampata:

1 + 2 = 3 
3 * 4 = 12 

Tuttavia, quando l'attività di aggiunta ha esito negativo la prima volta e riesce nelle successive chiamate di nuovo tentativo, il resto delle attività nella catena non viene eseguito, ovvero l'attività di aggiunta non riesce, tutte le altre attività della catena non vengono eseguite e dopo un ew secondi, l'attività di aggiunta viene eseguita nuovamente e ha esito positivo e il resto delle attività nella catena (in questo caso mul.si (3, 4)) non viene eseguito.

Il sedano fornisce un modo per continuare le catene guaste dall'attività che ha avuto esito negativo, in seguito? In caso contrario, quale sarebbe l'approccio migliore per ottenere questo risultato e assicurarsi che le attività di una catena vengano eseguite nell'ordine specificato e solo dopo che l'attività precedente è stata eseguita correttamente anche se l'attività viene ripetuta alcune volte?

Nota 1: Il problema può essere risolto facendo

add.delay(1, 2).get() 
mul.delay(3, 4).get() 

ma sono interessati a capire il motivo per cui le catene non funzionano con attività non riuscite.

risposta

0

Sono anche interessato a capire perché le catene non funzionano con attività non riuscite.

I scavare un certo codice sedano e quello che ho trovato finora è:

L'implementazione happends a app.builtins.py

@shared_task 
def add_chain_task(app): 
    from celery.canvas import chord, group, maybe_subtask 
    _app = app 

    class Chain(app.Task): 
     app = _app 
     name = 'celery.chain' 
     accept_magic_kwargs = False 

     def prepare_steps(self, args, tasks): 
      steps = deque(tasks) 
      next_step = prev_task = prev_res = None 
      tasks, results = [], [] 
      i = 0 
      while steps: 
       # First task get partial args from chain. 
       task = maybe_subtask(steps.popleft()) 
       task = task.clone() if i else task.clone(args) 
       i += 1 
       tid = task.options.get('task_id') 
       if tid is None: 
        tid = task.options['task_id'] = uuid() 
       res = task.type.AsyncResult(tid) 

       # automatically upgrade group(..) | s to chord(group, s) 
       if isinstance(task, group): 
        try: 
         next_step = steps.popleft() 
        except IndexError: 
         next_step = None 
       if next_step is not None: 
        task = chord(task, body=next_step, task_id=tid) 
       if prev_task: 
        # link previous task to this task. 
        prev_task.link(task) 
        # set the results parent attribute. 
        res.parent = prev_res 

       results.append(res) 
       tasks.append(task) 
       prev_task, prev_res = task, res 

      return tasks, results 

     def apply_async(self, args=(), kwargs={}, group_id=None, chord=None, 
       task_id=None, **options): 
      if self.app.conf.CELERY_ALWAYS_EAGER: 
       return self.apply(args, kwargs, **options) 
      options.pop('publisher', None) 
      tasks, results = self.prepare_steps(args, kwargs['tasks']) 
      result = results[-1] 
      if group_id: 
       tasks[-1].set(group_id=group_id) 
      if chord: 
       tasks[-1].set(chord=chord) 
      if task_id: 
       tasks[-1].set(task_id=task_id) 
       result = tasks[-1].type.AsyncResult(task_id) 
      tasks[0].apply_async() 
      return result 

     def apply(self, args=(), kwargs={}, **options): 
      tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']] 
      res = prev = None 
      for task in tasks: 
       res = task.apply((prev.get(),) if prev else()) 
       res.parent, prev = prev, res 
      return res 
    return Chain 

Si può vedere che alla fine prepare_stepsprev_task è collegato al prossimo compito. Quando prev_task non è riuscito, l'attività successiva non viene chiamata.

sto testando con l'aggiunta del link_error dal compito prev alla successiva:

if prev_task: 
    # link and link_error previous task to this task. 
    prev_task.link(task) 
    prev_task.link_error(task) 
    # set the results parent attribute. 
    res.parent = prev_res 

Ma poi, il passo successivo deve prendersi cura di entrambi i casi (forse, se non quando è configurato per essere immutabile, ad esempio, non accettare più argomenti).

Credo catena può sostenere che, consentendo una sintassi piace questo:

c = chain(t1, (t2, t1e), (t3, t2e))

che significa:

t1link a t2 e link_error a t1e

t2link a t3 e link_error-t2e

+0

Ho deciso di utilizzare un'attività a catena che esegue tutte le attività che altrimenti si troverebbero in una catena, ma attende l'esecuzione di un'attività prima di avviare l'altra, ad es .: 'task1.delay ([params]). ottenere(); . Task2.delay ([params]) get(); task3.delay ([params]). get() '. L'attività concatenata può rilevare le eccezioni sollevate da una qualsiasi attività e riprovare se stessa. – Andrei

+0

Quindi dal tuo esempio, t1e e t2e dovrebbero chiamare t2 e, rispettivamente, t3, giusto? – Andrei

+0

L'esempio sono solo i miei pensieri sulla possibile sintassi per catena. Significa che ogni attività successiva ora è effettivamente una coppia di attività, il primo elemento della coppia verrà chiamato se nessuna eccezione/errore si verifica nel passaggio precedente e il secondo elemento è il gestore di eccezioni/errori per l'errore del passaggio precedente. 't1e' significa' gestore di errori t1' Le catene – anh

Problemi correlati