Un problema con la risposta accettata è che è lento. Controllare se un'attività è già in esecuzione implica effettuare una chiamata al broker e quindi eseguire iterazioni sia per le attività in esecuzione che per quelle attive. Se si desidera accodare velocemente l'attività, ciò non funzionerà. Anche la soluzione attuale ha una piccola condizione di competizione, in quanto 2 processi potrebbero verificare se l'attività è stata accodata allo stesso modo (scopri che non lo è), che quindi accoderebbe 2 attività.
Una soluzione migliore sarebbe quella che chiamo attività rimbalzate. Fondamentalmente si incrementa un contatore ogni volta che si accoda un task. Quando l'attività inizia decrementa. Usa i redis e poi è tutto atomico.
ad es.
coda il compito:
conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
Poi nel compito, hai 2 opzioni, vuoi per eseguire l'operazione 15 secondi dopo che il primo è stato in coda (farfalla) o eseguirlo 15 secondi dopo l'ultimo è stato messo in coda (antirimbalzo). Cioè, se continuiamo a provare a eseguire lo stesso compito, estendiamo il timer, oppure aspettiamo solo 15 per il primo e ignoriamo le altre attività che sono state accodate.
Facile per supportare sia, ecco antirimbalzo dove ci aspettiamo che i compiti smette di ottenere in coda:
conn = get_redis()
counter = conn.decr(key)
if counter > 0:
# task is queued
return
# continue on to rest of task
versione della valvola a farfalla:
counter = conn.getset(key, '0')
if counter == '0':
# we already ran so ignore all the tasks that were queued since
return
# continue on to task
Un altro vantaggio di questa soluzione sul accettata è che la la chiave è interamente sotto il tuo controllo. Quindi, se si desidera eseguire lo stesso compito, ma solo una volta per id/oggetti diversi, ad esempio, lo si incorpora nella chiave.
Aggiornamento
ciò pensavo ancora di più, si può fare la versione dell'acceleratore ancora più semplice, senza dover fare la coda compiti.
Throttle v2 (quando la fila del compito)
conn = get_redis()
counter = conn.incr(key)
if counter == 1:
# queue up the task only the first time
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
Poi nel compito di impostare il contatore torna a 0.
Non hanno nemmeno bisogno di utilizzare un contatore, se aveste un set è possibile aggiungere la chiave al set. Se torni indietro 1, la chiave non era nell'insieme e dovresti fare la coda all'attività. Se torni a 0, la chiave è già nell'insieme quindi non accodare l'attività.
Sì, sono d'accordo, anche se hai detto che il processo di iterazione su tutte le attività in esecuzione è costoso. Nice –
da dove proviene 'get_redis'? –
Questo è il mio metodo che restituisce una connessione redis. – dalore