2016-05-01 14 views

risposta

3

Non è una cosa semplice, perché solo i singoli broker sanno quali sono le ultime informazioni di offset per un determinato argomento/partizione.

È possibile eseguire un OffsetRequest. Quanto segue restituirà le correzioni più recenti e più recenti per un argomento/partizione (è Scala, ma dovresti essere in grado di ottenere l'idea se non usi Scala).

Nota è necessario utilizzare un SimpleConsumer connesso al broker che è il leader per la partizione richiesta. Di solito quello che faccio è creare un SimpleConsumer per ognuno dei miei broker. Poi faccio una richiesta di dati di meta e ottenere la partizione di mappatura leader, allora partizione foreach faccio questo:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = { 
    val time = kafka.api.OffsetRequest.LatestTime 
    val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))) 
    val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq") 
    val resp = consumer.getOffsetsBefore(req) 
    val offsets = resp.offsets(topic, partition) 
    if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0)) 
    else (0, -1) 
} 

Spero che questo aiuti.

+0

ottimo lavoro @David, ne hai un esempio completo? E ... perché stai usando 'kafka.javaapi.OffsetRequest' se esiste l'oggetto Scala con lo stesso nome? – salvob

Problemi correlati