2016-07-18 7 views
8

Sto scrivendo un utente kafka utilizzando Java. Voglio mantenere il tempo reale del messaggio, quindi se ci sono troppi messaggi in attesa di consumare, come 1000 o più, dovrei abbandonare i messaggi non consumati e iniziare a consumare dall'ultimo offset.Come posso ottenere l'ULTIMO offset di un argomento di kafka?

Per questo problema, provo a confrontare l'ultimo offset commesso e l'ultimo offset di un argomento (solo 1 partizione), se la differenza tra questi due offset è maggiore di un determinato importo, imposterò l'ultimo offset di l'argomento come prossimo offset in modo da poter abbandonare quei messaggi ridondanti.

Ora il mio problema è come ottenere l'ultimo offset di un argomento, alcune persone dicono che posso usare il vecchio consumatore, ma è troppo complicato, il nuovo consumatore ha questa funzione?

risposta

12

Anche il nuovo consumatore è complicato.

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

+0

Questo non funziona se si desidera calcolare la differenza tra l'offset corrente del client e l'ultimo offset di argomento kafka noto! – hiaclibe

5

Per la versione Kafka: 0.10.1.1

// Get the diff of current position and latest offset 
Set<TopicPartition> partitions = new HashSet<TopicPartition>(); 
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition()); 
partitions.add(actualTopicPartition); 
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition); 
long actualPosition = consumer.position(actualTopicPartition);   
System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition)); 
0
KafkaConsumer<String, String> consumer = ... 
consumer.subscribe(Collections.singletonList(topic)); 
TopicPartition topicPartition = new TopicPartition(topic, partition); 
consumer.poll(0); 
consumer.seekToEnd(Collections.singletonList(topicPartition)); 
long currentOffset = consumer.position(topicPartition) -1; 

Sopra frammento restituisce l'offset per l'argomento dato e la partizione corrente messaggio impegnati numero.

Problemi correlati