2014-11-20 24 views
7

Sto costruendo un'applicazione che consente di aggiungere e rimuovere in modo dinamico gli abbonamenti agli argomenti di kafka. Quando viene aggiunta una sottoscrizione argomento, desidero eseguire un processo batch ogni ora che recupera tutti i nuovi messaggi e li spinge in un altro archivio dati.Kafka - Il modo più semplice per ottenere l'ultima correzione

Quello che voglio capire è come ottenere l'attuale offset di un argomento. Non appena viene aggiunta una sottoscrizione, desidero che il successivo processo batch riceva tutti i messaggi dal momento approssimativo dell'abbonamento.

Ad esempio, immagino di avere un argomento chiamato "ArgomentoA" che riceve costantemente messaggi. Se aggiungo un abbonamento alle 19:15, quando il lavoro in batch viene eseguito alle 20:00, desidero che tutti i messaggi vengano caricati dalle 7.15pm. Sono felice che il tempo sia approssimativo - 7,10, 7,20, ecc. 5 o 10 minuti su entrambi i lati non mi preoccupa.

Quindi la mia soluzione è quella di ottenere l'attuale offset di un argomento nel momento in cui viene aggiunto un abbonamento. Ho guardato il consumatore semplice, ma non voglio essere coinvolto in tutti gli aspetti del managemnet del cluster per questo caso d'uso di base.

Ho anche guardato il consumatore di alto livello. Potrei qualcosa di simile:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset 

Ciò che mi preoccupa di questo approccio è che la chiamata a "capo" è in realtà un flusso. Quindi credo che bloccherà in attesa del prossimo messaggio. Il blocco è problematico perché potrebbe causare l'accodamento di altre sottoscrizioni fino all'arrivo del messaggio successivo.

Sono felice di dedicare un po 'di tempo all'implementazione del secondo approccio, ma se c'è un modo più semplice che non richiede di scrivere codice concorrente soggetto a errore, allora preferirei non sprecare il mio tempo.

Avrò anche bisogno di un modo per ottenere tutti i registri da quell'offset.

risposta

2

Ogni risposta a una richiesta di recupero restituisce un "HighWaterMark" che rappresenta l'ultimo offset nel log della partizione attualmente in uso. Quindi, in teoria, è possibile recuperare il messaggio meno recente o qualsiasi messaggio (supponendo che ne esista uno) per un determinato argomento e estrarre HighWaterMark dalla risposta. C'è di più dettagli sulla HighWaterMark qui: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

Naturalmente, essendo in grado di tirare la HighWaterMarkOffset dalla risposta dipende dal vostro client che effettua che i dati disponibili attraverso il proprio Kafka API.

+0

Questo sarebbe il limite massimo per una partizione particolare. Penso che stia chiedendo informazioni sul "ultimo messaggio" {partitionId, offsetId}. – arviman

+1

Penso che non esista un "ultimo messaggio" globale. Kafka non scalerebbe se avesse un meccanismo di sincronizzazione globale ... –

Problemi correlati