Sto provando a utilizzare l'API consumer di basso livello per gestire gli offset manualmente, con l'ultimo kafka_2.10-0.8.2.1. Per verificare che gli offset che ho commesso/letto da Kafka siano corretti, io uso lo strumento kafka.tools.ConsumerOffsetChecker.Chiarificazione operazioni di offset API Kafka Java
Ecco un esempio di output per il mio argomento di gruppo/consumatore:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Ecco la mia interpretazione del risultato:
Offset = 5 -> questo è l'attuale scostamento di il mio consumatore 'elastic_search_group'
logsize = 29 -> questo è l'ultimo di offset - l'offset del messaggio successivo che verrà a questo tema/partizione
Lag = 24 -> 29-5 - quanti messaggi non sono ancora elaborati dal mio consumatore 'elastic_search_group'
Pid - partizione ID
Q1: è corretto?
Ora, voglio ottenere le stesse informazioni dal mio utente Java. Qui ho scoperto che dovevo utilizzare due API diverse:
kafka.javaapi. OffsetRequest per ottenere offset più recenti e più recenti, ma kafka.javaapi. OffsetFetchRequest per ottenere l'offset corrente.
Per arrivare non prima (o più recente) di offset che faccio:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
E per ottenere la corrente di offset devo usare un'API completamente diverso:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2: è corretto? perché ci sono due API diverse per ottenere informazioni molto simili?
Q3: importa quale versione ID e correlazione sto usando qui? Penso che versionId dovrebbe essere 0 per kafka pre-0.8.2.1, e essere 1 per 0.8.2.1 e successivi - ma sembra che funzioni anche con 0 per 0.8.2.1 - vedi sotto?
Quindi, per il l'esempio dello stato del tema sopra, e l'uscita al di sopra del ConsumerOffsetChecker, ecco quello che ottengo dalla mia codice Java:
currentOffset = 5; earliestOffset = 29; latestOffset = 29
'currentOffset' sembra essere Ok, 'latestOffset' è corretto, ma 'earliestOffset'? Mi aspetterei che fosse almeno '5'?
Q4: Come può accadere che earliestOffset sia superiore a currentOffset? Il mio unico sospetto è che forse i messaggi dell'argomento siano stati eliminati a causa del criterio di conservazione .... In altri casi questo potrebbe essere successo?
Grazie, @ Shades88! Dopo alcuni test, per il n. 4, sono arrivato alla stessa conclusione, che questa situazione si sarebbe verificata quando i registri sono stati eliminati a causa dei criteri di conservazione. Così ho aggiunto la gestione di questo caso d'angolo alla mia logica di consumo: convalidare che l'offset corrente è> = primo offset e impostarlo su EarliestOffset se non lo è. Grazie! – Marina
Per quanto riguarda 'versionId', se si specifica' 0', gli offset sono memorizzati in Zookeeper e se si utilizza '1', l'offset viene memorizzato in un argomento speciale di Kafka. –
Una pagina utile http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –