2015-04-30 12 views
7

Sto eseguendo Django, Celery e RabbitMQ. Quello che sto cercando di realizzare è quello di garantire che le attività relative ad un utente vengono eseguiti in ordine (in particolare, uno alla volta, non voglio compito concorrenza per utente)Come garantire l'ordine di esecuzione dell'attività per utente utilizzando Celery, RabbitMQ e Django?

  • ogni volta che viene aggiunto nuovo compito per l'utente, dovrebbe dipendere dall'ultima operazione aggiunta. Funzionalità aggiuntive potrebbero includere l'aggiunta di attività alla coda, se l'attività di questo tipo è in coda per questo utente e non è ancora stata avviata.

Ho fatto qualche ricerca e:

  • non riuscivo a trovare un modo per collegare compito appena creato con già in coda uno in Sedano sé, catene sembrano essere in grado di collegare nuovi compiti solo .
  • Penso che entrambe le funzionalità siano possibili da implementare con il gestore di messaggi RabbitMQ personalizzato, anche se potrebbe essere difficile codificarlo dopo tutto.
  • Ho anche letto su celery-tasktree e questo potrebbe essere un modo più semplice per garantire l'ordine di esecuzione, ma come faccio a collegare la nuova attività con già "applied_async" task_tree o coda? Esiste un modo per implementare questa funzionalità aggiuntiva non duplicata utilizzando questo pacchetto?

Edit: C'è questo anche questo "blocco" esempio in celery cookbook e come il concetto va bene, non riesco a vedere un possibile modo per farlo funzionare come previsto nel mio caso - semplicemente se non ci riesco acquisire il blocco per l'utente, il compito dovrebbe essere ripetuto, ma questo significa spingerlo fino alla fine della coda.

Quale sarebbe la migliore linea d'azione qui?

+0

Suppongo che tu non sappia quali sono le attività per un particolare utente prima di inserire l'attività? –

+0

Perché non creare una coda da soli (per utente) e fare in modo che Celery esegua le attività da lì? – trinchet

risposta

0

Se si configurano gli operatori di sedano in modo che possano eseguire solo un'attività alla volta (vedere l'impostazione worker_concurrency), è possibile applicare la concorrenza di cui si ha bisogno per ciascun utente. Usando un metodo come

NUMBER_OF_CELERY_WORKERS = 10 

def get_task_queue_for_user(user): 
    return "user_queue_{}".format(user.id % NUMBER_OF_CELERY_WORKERS) 

per ottenere la coda compito in base alla user id, ogni compito verrà assegnato alla stessa coda per ogni utente. I lavoratori dovrebbero essere configurati per consumare solo le attività da una singola coda di attività.

Sarebbe giocare fuori in questo modo:

  1. utente 49 innesca un compito

  2. Il compito viene inviato a user_queue_9

  3. Quando l'unico e solo lavoratore sedano che sta ascoltando user_queue_9 è pronto a consumare una nuova attività, l'attività viene eseguita

Questa è una risposta hacky però, perché

  • che richiede solo lavoratore sedano unico per ogni coda è un sistema fragile - se il lavoratore sedano si ferma, l'intera coda di ferma

  • gli operai sono in esecuzione inefficiente

+0

Sulla base di ciò, perché non dovresti utilizzare solo dieci bucket e mappare in coda con "user.id% 10" o nel caso di una stringa, "hash (user.id)% 10"? – knipknap

+0

Grazie, hai reso conto che non ho risolto la parte relativa alla concorrenza della domanda - la mia risposta al momento non è ancora corretta. –

Problemi correlati