2015-10-17 16 views
7

Sto tentando di utilizzare eventi avro serializzati da una coda Kafka. La coda di kafka viene popolata usando un semplice produttore di java. Per chiarezza sto condividendo i tre componenti:Logstash con Kafka: impossibile decodificare avro

schema Avro di file

{"namespace": "example.avro", 
"type": "record", 
"name": "User", 
"fields": [ 
    {"name": "name", "type": "string"}, 
    {"name": "favorite_number", "type": ["int", "null"]}, 
    {"name": "favorite_color", "type": ["string", "null"]} 
] 
} 

codice Java Produttore frammento di (User.class è prodotto con Avro-tools)

User user1 = new User(); 
    user1.setName("Alyssa"); 
    user1.setFavoriteNumber(256); 
    user1.setFavoriteColor("blue"); 
    String topic = "MemoryTest"; 

    // Properties set in 'props' 
    KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props); 

    ByteArrayOutputStream out = new ByteArrayOutputStream(); 
    DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class); 
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
    writer.write(user1, encoder); 
    encoder.flush(); 
    out.close(); 
    byte[] serializedBytes = out.toByteArray(); 
    producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes)); 

Logstash Config file

input { 
     kafka { 
       zk_connect => "localhost:2181" 
       topic_id => "MemoryTest" 
       type => "standard_event" 
       group_id => "butiline_dash_prod" 
     reset_beginning => true 
     auto_offset_reset => smallest 
     codec => { 
       avro => { 
        schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
       } 
      } 
     } 
} 

output { 
    stdout { 
    codec => rubydebug 
    } 
} 

Problema

Il gasdotto non riesce a livello logstash. Quando un nuovo evento viene inserito in Kafka, viene visualizzato nella console logstash:

Alyssa�blue {:exception=>#<NoMethodError: undefined method `decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error} 

risposta

10

Finalmente ho trovato l'errore. Invece di questo (come suggerito sul sito web Logstash - https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html)

codec => { 
    avro => { 
     schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
    } 
} 

La sintassi corretta è (come suggerito nella documentazione del plugin https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md):

codec => avro { 
     schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
} 

Credo che la sintassi è cambiato.

Problemi correlati