2015-08-17 17 views
7

Abbiamo l'installazione di Kafka per essere in grado di elaborare i messaggi in parallelo da diversi server. Ma ogni messaggio deve essere elaborato esattamente una sola volta (e da un solo server). Abbiamo installato e funzionante e funziona bene.Avere un consumer Kafka leggere un singolo messaggio alla volta

Ora, il problema per noi è che i consumatori di Kafka leggono i messaggi in batch per la massima efficienza. Questo porta a un problema se/quando l'elaborazione fallisce, il server si spegne o qualsiasi altra cosa, perché quindi perdiamo i dati che stavano per essere elaborati.

C'è un modo per far leggere al consumatore il messaggio alla volta per consentire a Kafka di conservare i messaggi non elaborati? Qualcosa di simile a; Consumatore tira un messaggio -> processo -> commuta offset al termine, ripeti. È fattibile usando Kafka? Qualche idea/idea?

Grazie!

risposta

0

Si dice di avere esattamente una elaborazione, ma poi si è preoccupati di perdere i dati. Suppongo che tu sia solo preoccupato per il caso limite quando uno dei tuoi server fallisce? E tu perdi i dati?

Non penso che ci sia un modo per realizzare un messaggio alla volta. Guardando attraverso lo consumer configurations, sembra esserci solo un'opzione per impostare i byte massimi che un consumatore può recuperare da Kafka, non il numero di messaggi.

fetch.message.max.bytes 

Ma se siete preoccupati per la perdita di dati del tutto, se non avete mai commettere l'offset Kafka non sarà marchio è come essere impegnata e non saranno persi. Leggendo la documentazione Kafka su delivery semantics,

Così efficacemente Kafka garanzie at-almeno-una volta la consegna per impostazione predefinita e permette all'utente di implementare al massimo una volta la consegna disattivando tentativi del produttore e commettendo il suo offset prima elaborare un gruppo di messaggi. Una consegna precisa richiede una cooperazione con lo del sistema di archiviazione di destinazione, ma Kafka fornisce l'offset che lo rende possibile implementare in modo diretto.

Quindi, per ottenere esattamente una volta l'elaborazione non è qualcosa che Kafka abilita di default. È necessario implementare la memorizzazione dell'offset ogni volta che si scrive l'output del processo in memoria.

Ma questo può essere gestito più semplicemente e in generale, semplicemente lasciando il negozio consumatori suo offset nello stesso posto come la sua uscita ... Come esempio di questo, la nostra ETL Hadoop che popola i dati nei negozi HDFS i suoi offset in HDFS con i dati letti in modo che sia garantito che sia i dati che gli offset siano entrambi aggiornati o nessuno dei due.

Spero che questo aiuti.

+0

Sì, sembra proprio non realmente supportato. Ma grazie per la risposta! :/ –

+0

Prego. Inoltre, se hai trovato che una delle nostre risposte ha risposto o è stata utile, una risposta accettata e/o un voto in su sarebbero apprezzate. – morganw09dev

0

Dipende dal client che si intende utilizzare. Per C++ e python, è possibile consumare il messaggio ONE ogni volta.

Per python, ho usato https://github.com/mumrah/kafka-python.Il codice seguente può consumare un messaggio ogni volta:

message = self.__consumer.get_message(block=False, timeout=self.IterTimeout, get_partition_info=True) 

__consumer è l'oggetto di SimpleConsumer.

Vedi la mia domanda e risposta qui: How to stop Python Kafka Consumer in program?

per C++, io sto usando https://github.com/edenhill/librdkafka. Il seguente codice può consumare un messaggio ogni volta.

214   while(m_bRunning) 
215   { 
216     // Start to read messages from the local queue. 
217     RdKafka::Message *msg = m_consumer->consume(m_topic, m_partition, 1000); 
218     msg_consume(msg); 
219     delete msg; 
220     m_consumer->poll(0); 
221   } 

m_consumer è il puntatore all'oggetto C++ Consumer (C++ API).

Spero che questo aiuto.

+0

Vuoi dire che consuma i messaggi uno alla volta o che in realtà sta tirando un messaggio alla volta da Kafka? Perché c'è una grande differenza. Quello che vogliamo è essere in grado di tirare un messaggio, commettere quell'offset una volta fatto, quindi tirare di nuovo, e così via. –

+0

i consumatori eseguono automaticamente il commit per impostazione predefinita. È possibile impostare la frequenza di commit. In python, commit_energy_n è 100 per impostazione predefinita. A proposito, devi impostare group_id. Ogni messaggio verrà consumato da un solo consumatore nel gruppo. – BAE

+0

È possibile eseguire il commit per programma utilizzando api. Un messaggio è impegnato e sposta lo spostamento dopo averlo consumato. È sempre necessario impostare l'offset iniziale. – BAE

Problemi correlati