Non è una cosa semplice, perché solo i singoli broker sanno quali sono le ultime informazioni di offset per un determinato argomento/partizione.
È possibile eseguire un OffsetRequest
. Quanto segue restituirà le correzioni più recenti e più recenti per un argomento/partizione (è Scala, ma dovresti essere in grado di ottenere l'idea se non usi Scala).
Nota è necessario utilizzare un SimpleConsumer
connesso al broker che è il leader per la partizione richiesta. Di solito quello che faccio è creare un SimpleConsumer
per ognuno dei miei broker. Poi faccio una richiesta di dati di meta e ottenere la partizione di mappatura leader, allora partizione foreach faccio questo:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)))
val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq")
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0))
else (0, -1)
}
Spero che questo aiuti.
fonte
2016-05-01 18:08:45
ottimo lavoro @David, ne hai un esempio completo? E ... perché stai usando 'kafka.javaapi.OffsetRequest' se esiste l'oggetto Scala con lo stesso nome? – salvob