2013-05-16 12 views
5

Nella topologia Storm, durante l'elaborazione di uno stream, desidero ritardare l'elaborazione di alcuni messaggi fino ad alcuni punti futuri nel tempo. Quali sono alcune opzioni ragionevoli per farlo?Elaborazione ritardata coda/messaggio in Storm

Finora, ho pensato al seguente:

  • Utilizzo di Java di Thread.sleep. (Tuttavia, sulla base di alcune discussioni, questo non è un metodo consigliato per utilizzare efficacemente le risorse di Storm.)
  • Utilizzare una coda ritardata ...
  • Storm ha qualche API per ritardare un messaggio che ho trascurato?
  • ZeroMQ fornisce un'API di messaggistica in ritardo che Storm (se modificata) potrebbe sfruttare?
+0

puoi dare qualche idea del perché vuoi farlo? se non sei pronto per elaborare questa roba, perché stai passando alla topologia della tempesta per cominciare? –

+2

La mia prima risposta: perché chiedere perché? È importante capire o rispondere alla domanda? Ci sono molti motivi per cui è utile ritardare (o riprogrammare) una tupla. La mia elaborazione di tuple non riguarda esclusivamente trasformazioni puramente funzionali dei dati. Nel mio caso, elaborare una tupla implica catturare lo stato di qualcosa al di fuori del sistema e integrarlo con altri flussi. Dal momento che cambia nel tempo, voglio catturare quello stato a intervalli controllati. Uno di questi è non consumare troppo spesso una risorsa esterna. –

risposta

2

Utilizzare una coda messaggi esterna per implementare una coda di ritardo.

Dal Storm è fault-tolerant e distribuito in orizzontale, non avrebbe molto senso per prendere una coda di messaggi che si adatta quello stile, come ad esempio:

  • Kafka
  • Amazon SQS
  • RabbitMQ
5

Utilizziamo tuple di topologia tick per elaborare in blocco le tuple in sospeso. In pratica li memorizza semplicemente in memoria su ogni tupla normale e quando riceve una tupla di tick li elabora in memoria/indicizzazione utilizzando l'elaborazione bulk/pipeline.

Utilizziamo anche redis nei casi in cui abbiamo enormi picchi di volume, se un picco di volume ha rilevato tutte le tuple reindirizzate all'archiviazione redis locale su ciascuno degli host e quindi vengono reinserite nell'elaborazione della topologia dopo che il volume si è esaurito. La nostra situazione potrebbe non essere applicabile al tuo, solo il mio 2c.