2016-06-02 24 views
8

Sto tentando di usare il SimpleConsumer in Kafka 9 per consentire agli utenti di riprodurre gli eventi da un offset di tempo - ma i messaggi che ricevo di ritorno da Kafka sono in modo molto strano di codifica:Kafka Java SimpleConsumer strana codifica

7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p= 
          ������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username" 

Utilizzando KafkaConsumer questi messaggi vengono analizzati correttamente. Ecco il codice che sto usando per recuperare i messaggi utilizzando il SimpleConsumer:

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { 
     long currentOffset = messageAndOffset.offset(); 
     if (currentOffset < readOffset) { 
      log.debug("Found an old offset - skip"); 
      continue; 
     } 

     readOffset = messageAndOffset.nextOffset(); 

     int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id 
     byte[] data = messageAndOffset.message().payload().array(); 
     byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset); 
     log.debug("Read " + new String(realData, "UTF-8")); 
} 

ho aggiunto il codice per saltare la prima x byte dopo ho continuato a ottenere UTF-32 errori su byte troppo elevata, che presumo è perché Kafka antepone informazioni come le dimensioni dei messaggi al carico utile. Questo è un artefatto di Avro?

+0

Non sembra Avro - almeno non codifica Avro binaria. Nella codifica binaria non si otterrebbero le informazioni sullo schema nel record. –

+0

Il mio codice è leggermente diverso - Invece di usare 'payload(). Array()', lo faccio come il modo in cui è fatto qui: https://cwiki.apache.org/confluence/display/KAFKA/0.8. 0 + SimpleConsumer + Esempio Eg: 'payload(). Get (bytes)' dove 'bytes' è di tipo' byte [] '. Il metodo 'get()' copia i dati, mentre 'array()' restituisce l'array attuale, e nel Javadocs per 'ByteBuffer' dice:" Le modifiche al contenuto di questo buffer causeranno la modifica del contenuto dell'array restituito, e vice versa." Forse qualcosa del genere è ciò che sta accadendo? –

+0

@Gandalf Per favore apri il tuo messaggio solo nel blocco note ++. Se lo apri con un altro wordpad o un blocco note, sembrerà pericoloso. Quindi aprilo nel blocco note ++ e faccelo sapere. – SkyWalker

risposta

0

non ho mai trovato una buona risposta a questo - (... Per partizione anche se l'implementazione è povero), ma ho passato a utilizzare il SimpleConsumerper interrogare Kafka per gli offset cui avevo bisogno e quindi utilizzare il nativo KafkaConsumer utilizzando seek(TopicPartition, offset) oppure seekToBeginning(TopicPartition)per recuperare i messaggi. Si spera che aggiungeranno, al client nativo, la possibilità di recuperare i messaggi da un determinato timestamp nella prossima versione.

0

Stai cercando questo?

readOffset = messageAndOffset.nextOffset(); 
ByteBuffer payload = messageAndOffset.message().payload(); 

    if(payload == null) { 
     System.err.println("Message is null : " + readOffset); 
     continue; 
    } 

final byte[] realData = new byte[payload.limit()]; 
payload.get(realData); 
System.out.println("Read " + new String(realData, "UTF-8")); 
0

È possibile accedere periodicamente la partizione di un offset si sta commettendo con il timestamp del messaggio (forse non ogni commit) e quindi si può avere una certa misura in futuro per impostare le compensazioni di consumo. Presumo che questo sia per il debug della produzione.

Dubito che aggiungerebbero una funzione del genere, sembra irrealizzabile considerando come funziona Kafka, anche se potrei sbagliarmi, c'è sempre qualcosa di geniale in corso. Farei la cosa del logging.