2011-07-05 13 views
6

Mi piacerebbe avere le attività di Celery che dipendono dal risultato di 2 o più altre attività. Ho esaminato Python+Celery: Chaining jobs? e http://pypi.python.org/pypi/celery-tasktree, ma sono utili solo se le attività hanno solo un'attività dipendente.Esecuzione delle attività di Celery con il grafico delle dipendenze

Conosco TaskSet, ma non sembra essere un modo per eseguire immediatamente una richiamata quando TaskSetResult.ready() diventa True. Quello che ho in mente in questo momento è di avere un'attività periodica che esegue il polling di TaskSetResult.ready() ogni pochi [milli] secondi circa e attiva la richiamata non appena restituisce True, ma suona piuttosto poco elegante per me.

Qualche suggerimento?

risposta

2

mrbox è vero, si può riprovare fino a quando i risultati sono pronti, ma non è così chiara nella documentazione che quando si riprovare si deve passare il setId e gli elementi sottoattività, e per il recupero che è necessario utilizzare la mappa funzione, sotto c'è un codice di esempio per spiegare cosa intendo.

def run(self, setid=None, subtasks=None, **kwargs): 

    if not setid or not subtasks: 
     #Is the first time that I launch this task, I'm going to launch the subtasks 
     … 
     tasks = [] 
     for slice in slices: 
      tasks.append(uploadTrackSlice.subtask((slice,folder_name))) 

     job = TaskSet(tasks=tasks) 
     task_set_result = job.apply_async() 
     setid = task_set_result.taskset_id 
     subtasks = [result.task_id for result in task_set_result.subtasks] 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 

    #Is a retry than we just have to check the results   
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks)) 
    if not tasks_result.ready(): 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 
    else:  
     if tasks_result.successful(): 
      return tasks_result.join() 
     else: 
      raise Exception("Some of the tasks was failing") 
2

IMHO si può fare qc simile alla cosa fatta in docs- metodo tentativi link

Oppure si può utilizzare con max_retries = Nessuno - se uno dei compiti della 'base' .ready() è falso, è possibile metodo fire .retry() fino al completamento di entrambe le attività di base.

7

Nelle recenti versioni di sedano (3.0+) è possibile utilizzare un cosiddetto accordo per ottenere l'effetto desiderato:

Da http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives:

semplice corda

La primitiva degli accordi ci consente di aggiungere callback da chiamare quando tutte le delle attività di un gruppo hanno terminato l'esecuzione, che è spesso richiesta per gli algoritmi che a non sono imbarazzante parallelo:

>>> from celery import chord 
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() 
>>> res.get() 
90 

Disclaimer: non ho ancora provato questo io stesso.

Problemi correlati