2015-11-26 7 views
6

L'Avro SpecificRecord (ovvero le classi java generate) è compatibile con l'evoluzione dello schema? Cioè se ho una fonte di messaggi Avro (nel mio caso, kafka) e voglio deserializzare quei messaggi su un record specifico, è possibile farlo in sicurezza?Come convertire da GenericRecord a SpecificRecord in Avro per gli schemi compatibili

quello che vedo:

  • l'aggiunta di un campo per la fine di uno schema funziona bene - può deserializzare ok per specificrecord
  • aggiunta di un campo al centro non - cioè rompe clienti esistenti

Anche se i messaggi sono compatibili, questo è un problema.

Se riesco a trovare il nuovo schema (utilizzando ad esempio confluenti registro schema) posso deserializzare a GenericRecord, ma non sembra essere un modo per mappare da genericrecord a specificrecord di diversi schemi ..

MySpecificType message = (T SpecificData.get().deepCopy(MySpecificType.SCHEMA$, genericMessage); 

Deepcopy è menzionato in vari punti ma utilizza l'indice quindi non funziona.

C'è un modo sicuro per mappare tra due oggetti avro quando si hanno entrambi gli schemi e sono compatibili? Anche se potessi mappare da genercrecord a genericrecord, ciò farebbe come potrei fare il trucco deepcopy per completare il lavoro.

+2

Hai mai a capire come fare questo? Sono bloccato sullo stesso problema, continuo a ottenere un errore "org.apache.avro.generic.GenericData $ Record non può essere inoltrato a org.apache.avro.specific.SpecificRecord" ... – Matt

+1

Ho usato una classe AutoMapper che ha mappato in base al nome del campo. Un esempio di implementazione è qui: https://gist.github.com/markdav/01623363b5b2508b8e5ef6146caedb1b –

risposta

2

Qui sono disponibili test di esempio per la conversione specifica del tipo di dati. Tutti i suoi nella configurazione '' specificDeserializerProps

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java

ho aggiunto il seguente config e ottenuto il tipo specifico come voleva.

HashMap<String, String> specificDeserializerProps = new HashMap<String, String>(); 
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus"); 
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); 
specificAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry, specificDeserializerProps); 

Speranza che aiuta

+0

Questo deserializzatore è nuovo .. Ci proverò! –

Problemi correlati