2015-05-20 25 views
9

Sto provando a utilizzare l'API consumer di basso livello per gestire gli offset manualmente, con l'ultimo kafka_2.10-0.8.2.1. Per verificare che gli offset che ho commesso/letto da Kafka siano corretti, io uso lo strumento kafka.tools.ConsumerOffsetChecker.Chiarificazione operazioni di offset API Kafka Java

Ecco un esempio di output per il mio argomento di gruppo/consumatore:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic 
Group           Topic                          Pid Offset          logSize         Lag             Owner 
elastic_search_group my_log_topic              0   5               29              24              none 

Ecco la mia interpretazione del risultato:

Offset = 5 -> questo è l'attuale scostamento di il mio consumatore 'elastic_search_group'

logsize = 29 -> questo è l'ultimo di offset - l'offset del messaggio successivo che verrà a questo tema/partizione

Lag = 24 -> 29-5 - quanti messaggi non sono ancora elaborati dal mio consumatore 'elastic_search_group'

Pid - partizione ID

Q1: è corretto?

Ora, voglio ottenere le stesse informazioni dal mio utente Java. Qui ho scoperto che dovevo utilizzare due API diverse:

kafka.javaapi. OffsetRequest per ottenere offset più recenti e più recenti, ma kafka.javaapi. OffsetFetchRequest per ottenere l'offset corrente.

Per arrivare non prima (o più recente) di offset che faccio:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition); 
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1)); 
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1)); 
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); 
OffsetResponse response = simpleConsumer.getOffsetsBefore(request); 
long[] offsets = response.offsets(topic, partition); 
long myEarliestOffset = offsets[0]; 
// OR for Latest: long myLatestOffset = offsets[0]; 

E per ottenere la corrente di offset devo usare un'API completamente diverso:

short versionID = 0; 
int correlationId = 0; 
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();  
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition); 
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId); 
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq); 
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset(); 

Q2: è corretto? perché ci sono due API diverse per ottenere informazioni molto simili?

Q3: importa quale versione ID e correlazione sto usando qui? Penso che versionId dovrebbe essere 0 per kafka pre-0.8.2.1, e essere 1 per 0.8.2.1 e successivi - ma sembra che funzioni anche con 0 per 0.8.2.1 - vedi sotto?

Quindi, per il l'esempio dello stato del tema sopra, e l'uscita al di sopra del ConsumerOffsetChecker, ecco quello che ottengo dalla mia codice Java:

currentOffset = 5; earliestOffset = 29; latestOffset = 29

'currentOffset' sembra essere Ok, 'latestOffset' è corretto, ma 'earliestOffset'? Mi aspetterei che fosse almeno '5'?

Q4: Come può accadere che earliestOffset sia superiore a currentOffset? Il mio unico sospetto è che forse i messaggi dell'argomento siano stati eliminati a causa del criterio di conservazione .... In altri casi questo potrebbe essere successo?

risposta

10

Stavo cercando i mezzi per trovare il ritardo nelle partizioni. E questo implica gli stessi passi che hai fatto. Finora, da qualunque cosa ho imparato, posso darti delle risposte.

  1. logSize punta direttamente al numero di messaggi accumulati in quella partizione specifica. Oppure, specifica l'offset massimo dei messaggi in quella partizione. Offset è l'offset dell'ultimo messaggio consumato con successo. Quindi il ritardo è solo la differenza tra Dimensione registro e Offset.
  2. Sì, è corretto. Finora, questi sono gli unici due modi per trovare l'attuale offset e il più recente o ultimo offset
  3. Non so perché c'è bisogno di specificare versionId. È possibile utilizzare kafka.api.OffsetRequest.CurrentVersion() per ottenere ID versione. Quindi hardcoding può essere evitato. Puoi tranquillamente assumere la correlazione come 0.
  4. Questo è strano. Quando uso EarliestTime() ottengo il primo offset come 0 anche quando il mio attuale offset è progredito molto più avanti. Significa che è l'inizio della partizione. Quindi, quando alcuni messaggi scadono in un tempo futuro, questo primo offset sarà quindi un numero diverso da zero. Ora se i messaggi sono stati cancellati a causa del ritardo del criterio di conservazione dovrebbe essere stato modificato. Sono incerto su questo comportamento. Un modo per essere certi sarebbe, eseguire il consumatore dopo aver annotato tale lettura e aver controllato i suoi log. Dovrebbe mostrare linee come queste.

    2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo: 52 :: reset consuma offset delle richieste: 2: offset recuperato = 405952: offset consumato = 335372 a 335372 2015-06-09 18 : 49: 15 :: :: DEBUG PartitionTopicInfo: 52 :: ripristino consumano compensato di richieste: 2: inverosimile offset = 405.952: consumato offset = 335373 al 335373

Si noti che nelle linee di log di cui sopra, recuperati resti compensato lo stesso compenso consumato è in aumento. Infine sarebbe finita in

2015-06-09 18:49:16 :: :: DEBUG PartitionTopicInfo: 52 :: ripristino consumano compensato di richieste: 2: inverosimile Offset = 405.952: consumato offset = 405.952-405.952

Quindi ciò significa che a causa del criterio di conservazione del registro, l'offset da 335372 a 405952 era scaduto

+1

Grazie, @ Shades88! Dopo alcuni test, per il n. 4, sono arrivato alla stessa conclusione, che questa situazione si sarebbe verificata quando i registri sono stati eliminati a causa dei criteri di conservazione. Così ho aggiunto la gestione di questo caso d'angolo alla mia logica di consumo: convalidare che l'offset corrente è> = primo offset e impostarlo su EarliestOffset se non lo è. Grazie! – Marina

+0

Per quanto riguarda 'versionId', se si specifica' 0', gli offset sono memorizzati in Zookeeper e se si utilizza '1', l'offset viene memorizzato in un argomento speciale di Kafka. –

+0

Una pagina utile http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –

Problemi correlati