2015-10-02 11 views
8

Usiamo Storm con il becco Kafka. Quando falliamo i messaggi, vorremmo rieseguirli, ma in alcuni casi dati errati o errori di codice faranno sì che i messaggi falliscano sempre in un Bolt, quindi entreremo in un ciclo di riproduzione infinito. Ovviamente stiamo correggendo gli errori quando li troviamo, ma vorremmo che la nostra topologia fosse generalmente tollerante ai guasti. Come possiamo ack() una tupla dopo che è stata riprodotta più di N volte?Numero massimo di ripetizioni della tupla su Storm Kafka Spout

Guardando attraverso il codice per il Kafka becco, vedo che è stato progettato per riprovare con un esponenziale backoff timer e la comments on the PR Stato:

"Il becco non termina il ciclo di tentativi (è mia convinzione che non dovrebbe farlo, perché non può riportare il contesto sull'errore che è accaduto per abortire il reqeust), gestisce solo il ritardo dei tentativi. Un bullone nella topologia dovrebbe ancora chiamare ack() invece di fail() per fermarsi il ciclo."

Ho visto risposte StackOverflow che raccomandano di scrivere un beccuccio personalizzato, ma preferirei non rimanere bloccato mantenendo una patch personalizzata degli interni del becco Kafka se c'è un modo consigliato di farlo in un bullone.

Qual è il modo giusto per farlo in un bullone? Non vedo nessuno stato nella tupla che espone quante volte è stato riprodotto.

+1

Se si verifica un errore nel bullone in cui è possibile concludere che la particolare tupla è "cattiva" secondo la logica aziendale, è possibile "ack" invece di non riuscire .... quindi non verrà riprodotto .. ... –

risposta

5

Storm non fornisce alcun supporto per il problema. Pertanto, una soluzione personalizzata è l'unica strada da percorrere. Anche se non si desidera patch KafkaSpout, penso che l'introduzione di un contatore e interruzione del ciclo di replay in esso, sarebbe l'approccio migliore. In alternativa, potresti anche ereditare da KafkaSpout e inserire un contatore nella sottoclasse. Questo è ovviamente simile a una patch, ma potrebbe essere meno intrusivo e più facile da implementare.

Se si desidera utilizzare un bullone, è possibile eseguire quanto segue (che richiede anche alcune modifiche allo KafkaSpout o una sottoclasse di esso).

  • assegnare un ID univoco come attributo aggiuntivo per ogni tupla (forse, c'è già un ID univoco a disposizione, in caso contrario, si potrebbe introdurre una "contro-ID" o semplicemente l'intera tupla, vale a dire, tutti gli attributi, identificare ogni tupla).
  • Inserire un bullone dopo KafkaSpout tramite fieldsGrouping sull'ID (per garantire che una tupla riprodotta sia trasmessa in streaming alla stessa istanza di bullone).
  • All'interno del bullone, utilizzare un HashMap<ID,Counter> che bufferizza tutte le tuple e conta il numero di tentativi (ripresi). Se il contatore è inferiore al valore di soglia, inoltrare la tupla di input in modo che venga elaborata dalla topologia effettiva che segue (ovviamente, è necessario ancorare la tupla in modo appropriato). Se il conteggio è maggiore della soglia, accettare la tupla per interrompere il ciclo e rimuovere la relativa voce dal HashMap (è possibile anche registrare tutte le tuple non riuscite).
  • Per rimuovere le tuple elaborate con successo da HashMap, ogni volta che una tupla è attivata in KafkaSpout è necessario inoltrare l'ID tupla al bullone in modo che possa rimuovere la tupla da HashMap. Basta dichiarare un secondo stream di output per la sottoclasse KafkaSpout e sovrascrivere Spout.ack(...) (ovviamente è necessario chiamare super.ack(...) per assicurarsi che anche KafkaSpout ottenga l'ack).

Questo approccio potrebbe tuttavia consumare molta memoria.In alternativa ad avere una voce per ogni tupla nello HashMap, potresti anche utilizzare un terzo flusso (che è collegato al bullone come gli altri due) e inoltrare un ID tupla se una tupla non riesce (ad esempio, in Spout.fail(...)). Ogni volta, il bullone riceve un messaggio "fallito" da questo terzo flusso, il contatore aumenta. Finché nessuna voce è nello HashMap (o la soglia non viene raggiunta), il bullone semplicemente inoltra la tupla per l'elaborazione. Questo dovrebbe ridurre la memoria utilizzata ma richiede un po 'più di logica da implementare nel beccuccio e nel bullone.

Entrambi gli approcci hanno lo svantaggio, che ogni tupla inviata risulta in un messaggio aggiuntivo al nuovo bullone introduttivo (aumentando così il traffico di rete). Per il secondo approccio, potrebbe sembrare che tu debba solo inviare un messaggio "ack" al bullone per le tuple che hanno fallito prima. Tuttavia, non sai quali tuple hanno fallito e quali no. Se si desidera eliminare questo overhead di rete, è possibile introdurre un secondo HashMap in KafkaSpout che bufferizza gli ID dei messaggi non riusciti. Pertanto, è possibile inviare un messaggio "ack" solo se una tupla non riuscita è stata riprodotta correttamente. Naturalmente, questo terzo approccio rende la logica da implementare ancora più complessa.

Senza alcuna modifica di KafkaSpout, non vedo alcuna soluzione per il problema. Personalmente patchi KafkaSpout o userei il terzo approccio con un HashMap nella sottoclasse KafkaSpout e il bullone (perché ha consumato poca memoria e non ha messo un sacco di carico aggiuntivo sulla rete rispetto alle prime due soluzioni).

+0

è il metodo fail() nello spout chiamato da un singolo thread? Sto solo cercando di determinare se ho bisogno di avere una ConcurrentHashMap per tenere traccia di msgIds-> errorCnt o una semplice HashMap <> farebbe. grazie – user3169330

+0

'nextTuple()', 'ack()' e 'fail()' sono chiamati da un singolo thread. Usare 'HashMap' è sufficiente. Vedi qui per ulteriori dettagli: https://stackoverflow.com/questions/32547935/why-should-i-not-loop-or-block-in-spout-nexttuple –

+0

un'altra cosa, se ho N spout, fallisce() metodo per uno specifico msgId, viene chiamato sul server SAME/Spout? – user3169330

0

In pratica funziona così:

  1. Se si distribuisce topologie di essi dovrebbero essere di grado di produzione (questo è, ci si aspetta un certo livello di qualità, e il numero di tuple bassi).
  2. Se una tupla non riesce, controllare se la tupla è effettivamente valida.
  3. Se una tupla è valida (ad esempio non è stato inserito perché non è possibile connettersi a un database esterno o qualcosa del genere), rispondere.
  4. Se una tupla è miss-formata e non può mai essere gestita (ad esempio un id di database che è testo e il database è in attesa di un intero) dovrebbe essere ack, non sarà mai in grado di correggere tale cosa o inserirla in il database.
  5. È necessario registrare nuovi tipi di eccezioni (nonché i contenuti della tupla stessa). Dovresti controllare questi registri e generare la regola per convalidare le tuple in futuro. E infine aggiungere il codice per elaborarli correttamente (ETL) in futuro.
  6. Non registrare tutto, altrimenti i file di registro saranno enormi, essere molto selettivi su ciò che si registra. Il contenuto dei file di log dovrebbe essere utile e non un mucchio di spazzatura.
  7. Continua a farlo e alla fine coprirai solo tutti i casi.
0

Ci troviamo di fronte anche a dati simili in cui si verificano dati errati che causano il fallimento del bullone all'infinito.

Per risolvere questo problema in runtime, abbiamo introdotto un altro bullone che lo identifica come "DebugBolt" come riferimento. Quindi lo spout invia il messaggio a questo bullone prima e poi questo bullone esegue la correzione dei dati richiesta per i messaggi non validi e quindi li emette nel bullone richiesto. In questo modo è possibile correggere gli errori di dati al volo.

Inoltre, se è necessario eliminare alcuni messaggi, si può effettivamente passare ignoreFlag dal tuo DebugBolt al tuo Bolt originale e il tuo bullone originale dovrebbe semplicemente inviare un ack a beccuccio senza elaborazione se ignoreFlag è True.

0

Abbiamo semplicemente fatto in modo che il nostro bullone emettesse la tupla errata su un flusso di errore e l'avesse attivata. Un altro bullone ha gestito l'errore riportandolo su un argomento di Kafka appositamente per gli errori. Questo ci consente di dirigere facilmente il flusso di dati normale rispetto a quello di errore attraverso la topologia.

L'unico caso in cui si fallisce una tupla è perché alcune risorse richieste sono offline, come una connessione di rete, DB, ... Questi sono errori risolvibili. Tutto il resto viene indirizzato al flusso di errori per essere risolto o gestito come appropriato.

Tutto ciò presuppone naturalmente che non si vogliano perdite di dati. Se vuoi solo tentare il massimo sforzo e ignorare dopo alcuni tentativi, allora guarderei altre opzioni.