2013-02-07 16 views
7

Ho letto la documentazione sul sito Web Kafka ma dopo aver tentato di implementare un esempio minimale completo (produttore -> kafka -> consumer) non mi è molto chiaro come lo "stato consumatore", il l'offset deve essere gestito.Apache Kafka: stato consumatore

Alcune informazioni

  1. Sto utilizzando l'API ad alto livello (Java)
  2. mia consumatore è una semplice classe con un principale, fondamentalmente la stessa che si trova sulla "quickstart" Kafka pagina
  3. sto usando Zookeeper
  4. sto usando un singolo mediatore

Ora, la documentazione dice che la stor consumatori API HighLevel es suo stato utilizzando Zookeeper così mi aspetterei l'offset e quindi lo stato del consumatore sarebbe stato mantenuto tra

  • Kafka mediatore riavvia
  • consumatori riavvia

Ma purtroppo non è così: ogni Quando riavvio il broker o il consumatore, tutti i messaggi vengono riconsegnati. Ora, probabilmente queste sono domande stupide, ma

  1. In caso di Kafka riavviare: ho capito che è fino al consumatore per mantenere il suo stato in modo probabilmente quando l'intermediario (ri) avvia riconsegnare tutto (!) messaggi e il consumatore decide cosa consumare ... è giusto? Se sì, cosa succede se ho 10,0000,0000 di messaggi?

  2. In caso di riavvio del consumer JVM: se lo stato viene mantenuto su Zookeeper perché i messaggi vengono nuovamente consegnati? È possibile che la nuova JVM abbia una "identità" di consumatore diversa? E in questo caso, come posso legare la precedente identità?

risposta

2

Sembra che sia stato un cattivo lettore ... è tutto nella pagina di configurazione. Nello specifico, entrambe le mie domande sono state risolte impostando un flag "autooffset.reset" che per impostazione predefinita è "il più piccolo" e quindi causa gli effetti descritti.

Ora, con "il più grande" come valore, le cose funzionano come previsto, sia in caso di riavvio del consumatore che del broker, perché l'offset è sempre il più grande.

4

Sì, consumatore è responsabile di mantenere il suo stato, e Java di alto livello dei consumatori salva il suo stato in Zookeeper.

Molto probabilmente non è stata specificata la proprietà di configurazione groupId. In questa situazione kafka genera un numero casuale groupId.

È inoltre possibile che sia stata disattivata la proprietà di configurazione autocommit.enable.

È possibile trovare il riferimento completo della configurazione Kafka in questa pagina: http://kafka.apache.org/configuration.html in "Proprietà di configurazione importanti per il consumer di livello superiore" titolo.

4

per rispondere alla domanda iniziale: usando groupId consente di evitare il "re-consumare tutti i messaggi dall'inizio del tempo" situazione

se si cambia il groupId avrai tutti i messaggi dal momento in cui è stata creata la coda (o dall'ultima cancellazione dei dati basata sul criterio di conservazione dei registri di kafka)

non confondere questo con kafka-console-consumer "--from-beginning" flag (che imposta l'opzione auto.offset.reset) che è lì per scegliere tra le opzioni 1 e 2 di seguito:

1) consumare nuovi messaggi dal momento in cui l'ultimo messaggio è stato consumato (N OT dall'inizio del tempo in cui la coda di kafka è stata originariamente creata):

props.put ("auto.offset.reset", "smallest");

2) consumano i nuovi messaggi dal momento abbonato JVM viene avviata (in questo caso si rischia di messaggi mancanti messi in coda mentre abbonato è sceso e non ascolto alla coda):

props.put ("auto.offset.reset", "il più grande");


nota a margine: sotto è solo marginalmente legato alla domanda originale

per un caso d'uso più avanzato - se si sta cercando di impostare programatically consumatori offset per riprodurre i messaggi a partire da certa ora - Richiederebbe l'utilizzo dell'API SimpleConsumer come mostrato in https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example per trovare l'offset più piccolo da riprodurre dal broker/partizione di destra. Che essenzialmente sostituisce lo zookeeper con la nostra logica di FindLeader. molto difficile.

per questo caso d'uso (riproduzione ad-hoc dei messaggi a partire da un determinato tempo specificato dall'utente) abbiamo deciso di memorizzare la cache locale dei messaggi e gestire gli offset localmente invece di usare kafka offset management api (che richiederebbe reimplementare un buon pezzo di funzionalità zookeeper con SimpleConsumer).

I.e. considera kafka come un "postino", una volta che il messaggio è stato consegnato, va alla casella di posta locale e nel caso dovessimo tornare a un certo offset nel passato e, ad esempio, riprodurre i messaggi (che sono già stati consumati) per es. in caso di errore di un'app consumer, non torniamo all'ufficio postale (kafka brokers) per capire l'ordine di consegna corretto, ma gestiscilo localmente.

fine della nota a margine

+0

Potrebbe elaborare su come gestire gli offset a livello locale invece che da Kafka? Ad esempio, come si determinano e si calcolano gli offset per ciascun messaggio inviato per essere poi consumati. – David

+0

una volta consumato - aggiungi il timestamp attuale come ID del messaggio e memorizza il messaggio come blob binario (viene inviato in formato avro e non lo deserializziamo a questo punto) in hsql (con persistenza su disco), oppure puoi usare apache phoenix e archivio c'è in formato binario con due colonne ID (timestamp), Message (VARBINARY) – alex

+0

Ma come si riferisce all'offset del messaggio? Il valore di offset di Kafka non è un timestamp o una codifica binaria del messaggio o dell'hash di entrambi? Sono ancora nuovo di Kafka, quindi perdona la mia ignoranza. – David

Problemi correlati