2011-11-28 10 views
21

Sto cercando di utilizzare Avro per i messaggi in corso la lettura/scrittura di Kafka. Qualcuno ha un esempio di utilizzo del codificatore binario Avro per codificare/decodificare i dati che verranno inseriti in una coda di messaggi?Come codificare/decodificare i messaggi di Kafka usando il codificatore binario Avro?

Ho bisogno la parte Avro più che la parte Kafka. O forse dovrei considerare una soluzione diversa? Fondamentalmente, sto cercando di trovare una soluzione più efficiente per JSON per quanto riguarda lo spazio. Avro è stato appena menzionato poiché può essere più compatto di JSON.

risposta

11

ho finalmente ricordato di chiedere nelle mailing list Kafka e ottenuto il seguente come una risposta, che ha funzionato perfettamente.

Sì, è possibile inviare messaggi come matrici di byte. Se si guarda il costruttore della classe di segnalazione, si vedrà -

def questo (byte: Array [Byte])

Ora, guardando il produttore di invio) API (-

def inviare (producerData: ProducerData [K, V] *)

È possibile impostare V ad essere di tipo messaggio e K per ciò che si desidera la vostra chiave di essere. Se non si cura di partizionamento utilizzando una chiave, quindi impostare che a messaggio tipo pure.

Grazie, Neha

2

Invece di Avro, si potrebbe anche prendere in considerazione semplicemente la compressione dei dati; o con gzip (buona compressione, CPU più alta) o LZF o Snappy (molto più veloce, compressione più lenta).

Oppure, in alternativa c'è anche Smile binary JSON, supportato in Java da Jackson (con this extension): è formato compatto binario, e molto più facile da usare rispetto Avro:

ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
byte[] serialized = mapper.writeValueAsBytes(pojo); 
// or back 
SomeType pojo = mapper.readValue(serialized, SomeType.class); 

fondamentalmente stesso codice come con JSON, ad eccezione per passare fabbrica di formati diversi. Dal punto di vista della dimensione dei dati, se Smile o Avro sono più compatti, dipende dai dettagli del caso d'uso; ma entrambi sono più compatti di JSON.

vantaggio c'è che questo funziona veloce sia con JSON e sorriso, con lo stesso codice, utilizzando solo POJO. Rispetto ad Avro che richiede la generazione di codice o un sacco di codice manuale per confezionare e decomprimere GenericRecord s.

7

Se si desidera ottenere un array di byte da un messaggio Avro (la parte Kafka è già risposto), utilizzare l'encoder binario:

GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 
    try { 
     Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
     writer.write(record, e); 
     e.flush(); 
     byte[] byteData = os.toByteArray(); 
    } finally { 
     os.close(); 
    } 
+0

È possibile inviare questo byteData a KafkaBroker e leggerlo dal consumatore della console? Quale dovrebbe essere il serializzatore della chiave Producer? – user2441441

+0

Come menzionato nella risposta, la parte kafka è documentata in altre risposte: http://stackoverflow.com/a/8348264/5266 e http://stackoverflow.com/a/32341917/5266 –

12

Questo è un esempio di base. Non l'ho provato con più partizioni/argomenti.

// Esempio codice del produttore

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.*; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.avro.specific.SpecificDatumWriter; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
import java.io.ByteArrayOutputStream; 
import java.io.File; 
import java.io.IOException; 
import java.nio.charset.Charset; 
import java.util.Properties; 


public class ProducerTest { 

    void producer(Schema schema) throws IOException { 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "0:9092"); 
     props.put("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.put("request.required.acks", "1"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, byte[]> producer = new Producer<String, byte[]>(config); 
     GenericRecord payload1 = new GenericData.Record(schema); 
     //Step2 : Put data in that genericrecord object 
     payload1.put("desc", "'testdata'"); 
     //payload1.put("name", "अasa"); 
     payload1.put("name", "dbevent1"); 
     payload1.put("id", 111); 
     System.out.println("Original Message : "+ payload1); 
     //Step3 : Serialize the object to a bytearray 
     DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema); 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(payload1, encoder); 
     encoder.flush(); 
     out.close(); 

     byte[] serializedBytes = out.toByteArray(); 
     System.out.println("Sending message in bytes : " + serializedBytes); 
     //String serializedHex = Hex.encodeHexString(serializedBytes); 
     //System.out.println("Serialized Hex String : " + serializedHex); 
     KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes); 
     producer.send(message); 
     producer.close(); 

    } 


    public static void main(String[] args) throws IOException, DecoderException { 
     ProducerTest test = new ProducerTest(); 
     Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
     test.producer(schema); 
    } 
} 

// Esempio codice del consumo

Parte 1: Codice Consumer group: come si può avere più di più i consumatori per le partizioni multiple/argomenti.

import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.Executor; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* Created by on 9/1/15. 
*/ 
public class ConsumerGroupExample { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private ExecutorService executor; 

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){ 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig(a_zookeeper, a_groupId)); 
     this.topic = a_topic; 
    } 

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){ 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", a_zookeeper); 
     props.put("group.id", a_groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 

     return new ConsumerConfig(props); 
    } 

    public void shutdown(){ 
     if (consumer!=null) consumer.shutdown(); 
     if (executor!=null) executor.shutdown(); 
     System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     try{ 
      if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){ 

      } 
     }catch(InterruptedException e){ 
      System.out.println("Interrupted"); 
     } 

    } 


    public void run(int a_numThreads){ 
     //Make a map of topic as key and no. of threads for that topic 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(a_numThreads)); 
     //Create message streams for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

     //initialize thread pool 
     executor = Executors.newFixedThreadPool(a_numThreads); 
     //start consuming from thread 
     int threadNumber = 0; 
     for (final KafkaStream stream : streams) { 
      executor.submit(new ConsumerTest(stream, threadNumber)); 
      threadNumber++; 
     } 
    } 
    public static void main(String[] args) { 
     String zooKeeper = args[0]; 
     String groupId = args[1]; 
     String topic = args[2]; 
     int threads = Integer.parseInt(args[3]); 

     ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); 
     example.run(threads); 

     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException ie) { 

     } 
     example.shutdown(); 
    } 


} 

Parte 2: Consumatore individuale che effettivamente consuma i messaggi.

import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.message.MessageAndMetadata; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.generic.IndexedRecord; 
import org.apache.avro.io.DatumReader; 
import org.apache.avro.io.Decoder; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.commons.codec.binary.Hex; 

import java.io.File; 
import java.io.IOException; 

public class ConsumerTest implements Runnable{ 

    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run(){ 
     ConsumerIterator<byte[], byte[]>it = m_stream.iterator(); 
     while(it.hasNext()) 
     { 
      try { 
       //System.out.println("Encoded Message received : " + message_received); 
       //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray()); 
       //System.out.println("Deserializied Byte array : " + input); 
       byte[] received_message = it.next().message(); 
       System.out.println(received_message); 
       Schema schema = null; 
       schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
       DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
       Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null); 
       GenericRecord payload2 = null; 
       payload2 = reader.read(null, decoder); 
       System.out.println("Message received : " + payload2); 
      }catch (Exception e) { 
       e.printStackTrace(); 
       System.out.println(e); 
      } 
     } 

    } 


} 

prova AVRO schema:

{ 
    "namespace": "xyz.test", 
    "type": "record", 
    "name": "payload", 
    "fields":[ 
     { 
      "name": "name", "type": "string" 
     }, 
     { 
      "name": "id", "type": ["int", "null"] 
     }, 
     { 
      "name": "desc", "type": ["string", "null"] 
     } 
    ] 
} 

Le cose importanti da notare sono:

  1. Youll bisogno del Kafka standard ed vasetti Avro per eseguire questo codice, fuori dalla scatola.

  2. È molto importante props.put ("serializer.class", "kafka.serializer.DefaultEncoder"); Don t use stringEncoder as that won t funziona se si invia un array di byte come messaggio.

  3. È possibile convertire il byte [] in una stringa esadecimale e inviarlo e sul consumatore riconvertire la stringa esadecimale in byte [] e quindi nel messaggio originale.

  4. Esegui il guardiano e il broker come indicato qui: - http://kafka.apache.org/documentation.html#quickstart e crea un argomento chiamato "page_views" o qualsiasi altra cosa desideri.

  5. Eseguire ProducerTest.java e quindi ConsumerGroupExample.java e vedere i dati avro prodotti e consumati.

+0

Grazie per l'aiuto! ! Ho provato questo, ma nel codice utente la mia funzione it.hasNext() restituisce false in modo che il controllo non entri mai nel ciclo while. C'è qualche idea su cosa posso fare di sbagliato? –

3

Aggiornamento risposta.

Kafka ha un Avro serializzatore/deserializzatore con Maven (SBT formattato) coordinate

"io.confluent" % "kafka-avro-serializer" % "3.0.0" 

si passa un'istanza KafkaAvroSerializer al costruttore KafkaProducer.

Quindi è possibile creare istanze Avro GenericRecord e utilizzarle come valori all'interno di istanze di Kafka ProducerRecord che è possibile inviare con KafkaProducer.

Sul lato consumer Kafka, si utilizza KafkaAvroDeserializer e KafkaConsumer.

+0

Sareste in grado di fornire un esempio breve ma completo? –

+1

Questo funziona solo con il repository Maven di Confluent aggiunto, dal momento che non pubblicano le risorse in Maven Central: http://packages.confluent.io/maven –

Problemi correlati