2013-02-19 15 views
11

Ho una situazione simile a quella delineata here, ad eccezione del fatto che anziché concatenare attività con più argomenti, voglio concatenare attività che restituiscono un dizionario con più voci.Task chain di Celery e accesso ** kwargs

Questo è - in modo molto impreciso e astrattamente --- quello che sto cercando di fare:

tasks.py

@task() 
def task1(item1=None, item2=None): 
    item3 = #do some stuff with item1 and item2 to yield item3 
    return_object = dict(item1=item1, item2=item2, item3=item3) 
    return return_object 

def task2(item1=None, item2=None, item3=None): 
    item4 = #do something with item1, item2, item3 to yield item4 
    return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4) 
    return return_object 

Lavorare da ipython, sono in grado di chiamare task1 individualmente e in modo asincrono senza problemi

Posso anche chiamare TASK2 individualmente con il risultato restituito da task1 come un argomento stella doppia:

>>res1 = task1.s(item1=something, item2=something_else).apply_async() 
>>res1.status 
'SUCCESS' 
>>res2 = task2.s(**res1.result).apply_async() 
>>res2.status 
'SUCCESS 

Tuttavia, ciò che in ultima analisi, voglio ottenere è lo stesso risultato finale come sopra, ma tramite una catena, e qui, io non riesco a capire come avere TASK2 un'istanza non con (posizionali) argomenti restituiti da task1, ma con task1.result come ** kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK! 

ho il sospetto che io possa tornare indietro e riscrivi i miei compiti in modo che t hey restituisce argomenti posizionali invece di un dizionario, e questo può chiarire le cose, ma mi sembra che ci debba essere un modo per accedere all'oggetto restituito di task1 in task2 con la funzionalità equivalente della ** stella doppia. Sospetto anche che mi manchi qualcosa di abbastanza ovvio riguardo all'implementazione della sottotabella di Celery o * args vs ** kwargs.

Spero che questo abbia senso. E grazie in anticipo per eventuali suggerimenti.

risposta

1

chain e le altre primitive di tela sono nella famiglia di utilità funzionali come map e reduce.

E.g. dove map(target, items) chiamate target(item) per ogni elemento della lista, Python ha una versione usato raramente di carta chiamato itertools.starmap, che chiama invece target(*item).

Mentre potremmo aggiungere starchain e anche kwstarchain alla casella degli strumenti, questi sarebbe molto specializzato e probabilmente non utilizzate più spesso.

È interessante notare che Python ha reso questi inutili con l'elenco e generatore di espressioni, in modo che mappa viene sostituita da [target(item) for item in item] e Starmap con [target(*item) for item in item].

Così, invece di attuare diverse alternative per ogni primitiva Credo che dovremmo concentrarsi sulla ricerca di un modo più flessibile di sostegno di questa, ad esempio, come avere le espressioni di generatore alimentate con sedani (se possibile, e se non qualcosa di altrettanto potente)

+0

Capito. Grazie. Alla fine ho risolto questo problema modificando leggermente gli input/return nel mio task. T2 ora cerca solo un singolo oggetto dict come input, quindi recupera le coppie k/valore attese dal dict per eseguire l'operazione. –

+0

@BenjaminWhite ancora non lo capisco. puoi dirmi come hai fatto questo? – ashim888

1

Poiché questo non è incorporato nel sedano, ho scritto una funzione decoratore a qualcosa di simile.

# Use this wrapper with functions in chains that return a tuple. The 
# next function in the chain will get called with that the contents of 
# tuple as (first) positional args, rather than just as just the first 
# arg. Note that both the sending and receiving function must have 
# this wrapper, which goes between the @task decorator and the 
# function definition. This wrapper should not otherwise interfere 
# when these conditions are not met. 

class UnwrapMe(object): 
    def __init__(self, contents): 
     self.contents = contents 

    def __call__(self): 
     return self.contents 

def wrap_for_chain(f): 
    """ Too much deep magic. """ 
    @functools.wraps(f) 
    def _wrapper(*args, **kwargs): 
     if type(args[0]) == UnwrapMe: 
      args = list(args[0]()) + list(args[1:]) 
     result = f(*args, **kwargs) 

     if type(result) == tuple and current_task.request.callbacks: 
      return UnwrapMe(result) 
     else: 
      return result 
    return _wrapper 

miniera scarta come il concetto starchain, ma si potrebbe facilmente modificarlo per scartare kwargs invece.

5

Questo è il mio prendere il problema, utilizzando una classe astratta compito:

from __future__ import absolute_import 
from celery import Task 
from myapp.tasks.celery import app 


class ChainedTask(Task): 
    abstract = True  

    def __call__(self, *args, **kwargs): 
     if len(args) == 1 and isinstance(args[0], dict): 
      kwargs.update(args[0]) 
      args =() 
     return super(ChainedTask, self).__call__(*args, **kwargs) 

@app.task(base=ChainedTask) 
def task1(x, y): 
    return {'x': x * 2, 'y': y * 2, 'z': x * y}  


@app.task(base=ChainedTask) 
def task2(x, y, z): 
    return {'x': x * 3, 'y': y * 3, 'z': z * 2} 

È ora possibile definire ed eseguire la vostra catena in quanto tale:

from celery import chain 

pipe = chain(task1.s(x=1, y=2) | task2.s()) 
pipe.apply_async()