2015-08-31 7 views
10

Utilizzo di Celery per gestire la pianificazione delle attività in un'applicazione Django che sto sviluppando, sto lavorando con il database Django solo per i test.Consentire l'esecuzione di un'attività se non è già pianificata tramite il sedano

Ho appena provato diverse cose per gestire l'esecuzione di un'attività solo se non è già programmata o in corso come quella proposta in questo article, ma finora nulla ha funzionato.

Qualcosa di simile a questo:

task.py

@task() 
def add(x, y): 
    return x + y 

E poi quando si chiama due volte, come nel seguente modo:

import myapp.tasks.add 

myapp.tasks.add.apply_async((2,2), task_id=1, countdown=15) 
myapp.tasks.add.apply_async((2,2), task_id=2, countdown=15) 

Dovrebbe consentire un'istanza basata su countdown=15. Come posso realizzare che la seconda chiamata non la esegua mai se ce n'è un'altra in esecuzione o in attesa?

risposta

5

Un problema con la risposta accettata è che è lento. Controllare se un'attività è già in esecuzione implica effettuare una chiamata al broker e quindi eseguire iterazioni sia per le attività in esecuzione che per quelle attive. Se si desidera accodare velocemente l'attività, ciò non funzionerà. Anche la soluzione attuale ha una piccola condizione di competizione, in quanto 2 processi potrebbero verificare se l'attività è stata accodata allo stesso modo (scopri che non lo è), che quindi accoderebbe 2 attività.

Una soluzione migliore sarebbe quella che chiamo attività rimbalzate. Fondamentalmente si incrementa un contatore ogni volta che si accoda un task. Quando l'attività inizia decrementa. Usa i redis e poi è tutto atomico.

ad es.

coda il compito:

conn = get_redis() 
conn.incr(key) 
task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

Poi nel compito, hai 2 opzioni, vuoi per eseguire l'operazione 15 secondi dopo che il primo è stato in coda (farfalla) o eseguirlo 15 secondi dopo l'ultimo è stato messo in coda (antirimbalzo). Cioè, se continuiamo a provare a eseguire lo stesso compito, estendiamo il timer, oppure aspettiamo solo 15 per il primo e ignoriamo le altre attività che sono state accodate.

Facile per supportare sia, ecco antirimbalzo dove ci aspettiamo che i compiti smette di ottenere in coda:

conn = get_redis() 
counter = conn.decr(key) 
if counter > 0: 
    # task is queued 
    return 
# continue on to rest of task 

versione della valvola a farfalla:

counter = conn.getset(key, '0') 
if counter == '0': 
    # we already ran so ignore all the tasks that were queued since 
    return 
# continue on to task 

Un altro vantaggio di questa soluzione sul accettata è che la la chiave è interamente sotto il tuo controllo. Quindi, se si desidera eseguire lo stesso compito, ma solo una volta per id/oggetti diversi, ad esempio, lo si incorpora nella chiave.

Aggiornamento

ciò pensavo ancora di più, si può fare la versione dell'acceleratore ancora più semplice, senza dover fare la coda compiti.

Throttle v2 (quando la fila del compito)

conn = get_redis() 
counter = conn.incr(key) 
if counter == 1: 
    # queue up the task only the first time 
    task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

Poi nel compito di impostare il contatore torna a 0.

Non hanno nemmeno bisogno di utilizzare un contatore, se aveste un set è possibile aggiungere la chiave al set. Se torni indietro 1, la chiave non era nell'insieme e dovresti fare la coda all'attività. Se torni a 0, la chiave è già nell'insieme quindi non accodare l'attività.

+0

Sì, sono d'accordo, anche se hai detto che il processo di iterazione su tutte le attività in esecuzione è costoso. Nice –

+0

da dove proviene 'get_redis'? –

+0

Questo è il mio metodo che restituisce una connessione redis. – dalore

2

Guarda prima di saltare! È possibile controllare se ci sono attività in esecuzione/in attesa prima di accodare le attività.

from celery.task.control import inspect 

def is_running_waiting(task_name): 
    """ 
    Check if a task is running or waiting. 
    """ 
    scheduled_tasks = inspect().scheduled().values()[0] 
    for task in scheduled_tasks: 
     if task['request']['name'] == task_name: 
      return True 
    running_tasks = inspect().active().values()[0] 
    for task in running_tasks: 
     if task['request']['name'] == task_name: 
      return True 

Ora, se si coda tre aggiungere attività, prima sarà in coda per l'esecuzione, rimanendo non sarà in coda.

for i in range(3): 
    if not is_running_waiting('add'): 
     add.apply_async((2,2), countdown=15) 
Problemi correlati