2011-09-24 20 views
16

Questo è un po 'uno sparo al buio nel caso in cui qualcuno esperto nell'implementazione Java di Apache Avro stia leggendo questo.In Java, come posso creare un equivalente di un file contenitore Apro Avro senza dover utilizzare un file come supporto?

Il mio obiettivo di alto livello è avere un modo per trasmettere alcune serie di dati avro sulla rete (diciamo semplicemente HTTP per esempio, ma il particolare protocollo non è così importante per questo scopo). Nel mio contesto ho un HttpServletResponse che ho bisogno di scrivere questi dati in qualche modo.

inizialmente ho tentato di scrivere i dati come quello che ammontano a una versione virtuale di un file contenitore Avro (supporre che "risposta" è di tipo HttpServletResponse):

response.setContentType("application/octet-stream"); 
response.setHeader("Content-transfer-encoding", "binary"); 
ServletOutputStream outStream = response.getOutputStream(); 
BufferedOutputStream bos = new BufferedOutputStream(outStream); 

Schema someSchema = Schema.parse(".....some valid avro schema...."); 
GenericRecord someRecord = new GenericData.Record(someSchema); 
someRecord.put("somefield", someData); 
... 

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema); 
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter); 
fileWriter.create(someSchema, bos); 
fileWriter.append(someRecord); 
fileWriter.close(); 
bos.flush(); 

questo era tutto bene e dandy, ad eccezione che si scopre Avro in realtà non fornire un modo per leggere un file contenitore a parte da un file vero e proprio: l'DataFileReader ha solo due costruttori:

public DataFileReader(File file, DatumReader<D> reader); 

e

public DataFileReader(SeekableInput sin, DatumReader<D> reader); 

dove SeekableInput è un modulo personalizzato avro-specifico la cui creazione finisce anche per leggere da un file. Detto questo, a meno che non ci sia un modo per forzare in qualche modo un InputStream in un file (http://stackoverflow.com/questions/578305/create-a-java-file-object-or-equivalent-using-a-byte- array-in-memory-without-a suggerisce che non c'è, e ho provato anche a cercare intorno alla documentazione Java, questo approccio non funzionerà se il lettore all'altra estremità di OutputStream riceve quel file avro container (Non sono sicuro del motivo per cui hanno permesso a uno di generare file avro binary container su un OutputStream arbitrario senza fornire un modo per leggerli dal corrispondente InputStream dall'altra parte, ma questo è oltre il punto). Sembra che l'implementazione del lettore di file contenitore richieda la funzionalità "ricercabile" fornita da un file concreto.

Ok, quindi non sembra che quell'approccio farà quello che voglio. Che ne dici di creare una risposta JSON che imita il file del contenitore avro?

public static Schema WRAPPER_SCHEMA = Schema.parse(
    "{\"type\": \"record\", " + 
    "\"name\": \"AvroContainer\", " + 
    "\"doc\": \"a JSON avro container file\", " + 
    "\"namespace\": \"org.bar.foo\", " + 
    "\"fields\": [" + 
    "{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " + 
    "{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}" 
); 

Non sono sicuro se questo è il modo migliore per affrontare questo dato i vincoli di cui sopra, ma sembra che questo potrebbe fare il trucco. Metterò lo schema (di "Schema someSchema" dall'alto, per esempio) come una stringa all'interno del campo "schema", e quindi inserirò la forma serializzata in avro-binario di un record che corrisponde a quello schema (ad esempio "GenericRecord"). someRecord ") all'interno del campo" data ".

In realtà volevo sapere un dettaglio specifico di ciò che è descritto di seguito, ma ho pensato che valesse la pena dare un contesto più ampio, così che se ci fosse un migliore approccio di alto livello potrei prendere (questo approccio funziona ma non mi sembra ottimale) per favore fatemelo sapere.

La mia domanda è, supponendo che io vada con questo approccio basato su JSON, come posso scrivere la rappresentazione avrobinary del mio Record nel campo "data" dello schema di AvroContainer? Ad esempio, sono arrivato fino a qui:

ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema); 
Encoder e = new BinaryEncoder(baos); 
datumWriter.write(resultsRecord, e); 
e.flush(); 

GenericRecord someRecord = new GenericData.Record(someSchema); 
someRecord.put("schema", someSchema.toString()); 
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray())); 
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA); 
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8); 
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator); 
datumWriter.write(someRecord, e); 
e.flush(); 

PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse 
response.setContentType("text/plain"); 
response.setCharacterEncoding("UTF-8"); 
printWriter.print(baos.toString("UTF-8")); 

ho inizialmente provato omettendo la clausola ByteBuffer.wrap, ma poi poi la linea

datumWriter.write(someRecord, e); 

ha generato un'eccezione che non ho potuto lanciare un array di byte in ByteBuffer.Abbastanza corretto, sembra che quando la classe Encoder (di cui JsonEncoder è una sottoclasse) viene chiamata per scrivere un oggetto avro Bytes, richiede un ByteBuffer da fornire come argomento. Così, ho provato incapsulando il byte [] con java.nio.ByteBuffer.wrap, ma quando i dati sono stati stampati, è stato stampato come una serie di byte rettilineo, senza essere passato attraverso la rappresentazione esadecimale avro:

"data": {"bytes": ".....some gibberish other than the expected format...} 

Questo non sembra giusto. Secondo la documentazione di avro, l'oggetto byte di esempio che danno dice che ho bisogno di inserire un oggetto json, un esempio del quale sembra "\ u00FF", e quello che ho inserito non è chiaramente di quel formato. Quello che voglio sapere ora è il seguente:

  • Che cos'è un formato di byte avro? Assomiglia a "\ uDEADBEEFDEADBEEF ..."?
  • Come faccio a forzare i miei dati avro binari (come output da BinaryEncoder in un array byte []) in un formato che posso inserire nell'oggetto GenericRecord e farlo stampare correttamente in JSON? Ad esempio, voglio un oggetto DATI per il quale posso chiamare su alcuni GenericRecord "someRecord.put (" data ", DATA);" con i miei dati avro serializzati all'interno?
  • Come potrei quindi leggere quei dati in una matrice di byte sull'altra estremità (di consumo), quando viene fornita la rappresentazione JSON del testo e vuole ricreare GenericRecord come rappresentato dal formato JSON di AvroContainer?
  • (reiterando la domanda di prima) C'è un modo migliore in cui potrei fare tutto questo?
+1

org.apache.avro.file.DataFileStream? – Chikei

+3

SeekableInput non è solo un modulo personalizzato avro-specifico la cui creazione finisce per essere letto da un file. C'è [SeekableByteArrayInput] (http://avro.apache.org/docs/current/api/java/org/apache/avro/file/SeekableByteArrayInput.html) che legge da una matrice di byte in memoria. –

+0

Ottima domanda - e l'esigenza di aver bisogno dell'accesso casuale è molto strana, dal momento che è impossibile soddisfarla senza possibilmente un enorme buffer. Eppure sembra che non sia necessario fare altrettanto ... Non so perché sia ​​stato ritenuto necessario un accesso casuale. Molti altri formati di dati non aggiungono tali requisiti per l'elaborazione. – StaxMan

risposta

1

Come detto Knut, se si desidera utilizzare qualcosa di diverso da un file, è possibile:

  • uso SeekableByteArrayInput, come ha detto Knut, per qualsiasi cosa si può calzascarpe in un array di byte
  • Implementa SeekablInput a modo tuo, ad esempio se stavi recuperando da una strana struttura di database.
  • Oppure basta usare un file. Perchè no?

Queste sono le tue risposte.

+0

Fantastico, è esattamente ciò di cui avevo bisogno. –

+4

Inoltre, l'utilizzo di un file aumenta il sovraccarico per l'I/O su disco, quindi se si riceve un array di byte attraverso la rete non si desidera inserirlo prima in un file e quindi leggerlo (disco I/O round trip! !!). –

0

Il modo in cui l'ho risolto era spedire gli schemi separatamente dai dati. Ho impostato un handshake di connessione che trasmette gli schemi dal server, quindi invio i dati codificati avanti e indietro. È necessario creare un oggetto involucro esterno in questo modo:

{'name':'Wrapper','type':'record','fields':[ 
    {'name':'schemaName','type':'string'}, 
    {'name':'records','type':{'type':'array','items':'bytes'}} 
]} 

Dove si codifica la matrice di record, uno per uno, in un array di array di byte codificati. Tutto in un array dovrebbe avere lo stesso schema. Quindi codifichi l'oggetto wrapper con lo schema precedente - imposta "schemaName" come nome dello schema che hai usato per codificare l'array.

Sul server, decodificare prima l'oggetto wrapper. Una volta decodificato l'oggetto wrapper, conosci lo schemaName e disponi di una serie di oggetti che sai come decodificare: usa come vuoi!

noti che è possibile ottenere via senza usare l'oggetto wrapper se si utilizza un protocollo come WebSockets e un motore come Socket.IO (per Node.js) Socket.io ti dà un livello di comunicazione basata su canale tra browser e server. In tal caso, basta usare uno schema specifico per ogni canale, codificare ciascun messaggio prima di inviarlo. È ancora necessario condividere gli schemi all'avvio della connessione, ma se si utilizza WebSockets è facile da implementare.E quando hai finito hai un numero arbitrario di flussi bidirezionali fortemente tipizzati tra client e server.

+0

Anche se non è una soluzione sbagliata, non si avvicina nemmeno ad affrontare la domanda dichiarata dall'OP. – rbellamy

0

Sotto Java e Scala, abbiamo provato a utilizzare initce tramite codice generato utilizzando il codificatore nitro di Scala. Inception è come la libreria Javascript mtth/avsc ha risolto questo problem. Tuttavia, abbiamo incontrato diversi problemi di serializzazione che utilizzavano la libreria Java in cui venivano immessi byte errati nel flusso di byte, in modo coerente - e potremmo non capire da dove provenivano quei byte.

Ovviamente ciò significava creare la nostra implementazione di Varint con codifica ZigZag. Meh.

Eccolo:

package com.terradatum.query 

import java.io.ByteArrayOutputStream 
import java.nio.ByteBuffer 
import java.security.MessageDigest 
import java.util.UUID 

import akka.actor.ActorSystem 
import akka.stream.stage._ 
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} 
import com.nitro.scalaAvro.runtime.GeneratedMessage 
import com.terradatum.diagnostics.AkkaLogging 
import org.apache.avro.Schema 
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} 
import org.apache.avro.io.EncoderFactory 
import org.elasticsearch.search.SearchHit 

import scala.collection.mutable.ArrayBuffer 
import scala.reflect.ClassTag 

/* 
* The original implementation of this helper relied exclusively on using the Header Avro record and inception to create 
* the header. That didn't work for us because somehow erroneous bytes were injected into the output. 
* 
* Specifically: 
* 1. 0x08 prepended to the magic 
* 2. 0x0020 between the header and the sync marker 
* 
* Rather than continue to spend a large number of hours trying to troubleshoot why the Avro library was producing such 
* erroneous output, we build the Avro Container File using a combination of our own code and Avro library code. 
* 
* This means that Terradatum code is responsible for the Avro Container File header (including magic, file metadata and 
* sync marker) and building the blocks. We only use the Avro library code to build the binary encoding of the Avro 
* records. 
* 
* @see https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files 
*/ 
object AvroContainerFileHelpers { 

    val magic: ByteBuffer = { 
    val magicBytes = "Obj".getBytes ++ Array[Byte](1.toByte) 
    val mg = ByteBuffer.allocate(magicBytes.length).put(magicBytes) 
    mg.position(0) 
    mg 
    } 

    def makeSyncMarker(): Array[Byte] = { 
    val digester = MessageDigest.getInstance("MD5") 
    digester.update(s"${UUID.randomUUID}@${System.currentTimeMillis()}".getBytes) 
    val marker = ByteBuffer.allocate(16).put(digester.digest()).compact() 
    marker.position(0) 
    marker.array() 
    } 

    /* 
    * Note that other implementations of avro container files, such as the javascript library 
    * mtth/avsc uses "inception" to encode the header, that is, a datum following a header 
    * schema should produce valid headers. We originally had attempted to do the same but for 
    * an unknown reason two bytes wore being inserted into our header, one at the very beginning 
    * of the header before the MAGIC marker, and one right before the syncmarker of the header. 
    * We were unable to determine why this wasn't working, and so this solution was used instead 
    * where the record/map is encoded per the avro spec manually without the use of "inception." 
    */ 
    def header(schema: Schema, syncMarker: Array[Byte]): Array[Byte] = { 
    def avroMap(map: Map[String, ByteBuffer]): Array[Byte] = { 
     val mapBytes = map.flatMap { 
     case (k, vBuff) => 
      val v = vBuff.array() 
      val byteStr = k.getBytes() 
      Varint.encodeLong(byteStr.length) ++ byteStr ++ Varint.encodeLong(v.length) ++ v 
     } 
     Varint.encodeLong(map.size.toLong) ++ mapBytes ++ Varint.encodeLong(0) 
    } 

    val schemaBytes = schema.toString.getBytes 
    val schemaBuffer = ByteBuffer.allocate(schemaBytes.length).put(schemaBytes) 
    schemaBuffer.position(0) 
    val metadata = Map("avro.schema" -> schemaBuffer) 
    magic.array() ++ avroMap(metadata) ++ syncMarker 
    } 

    def block(binaryRecords: Seq[Array[Byte]], syncMarker: Array[Byte]): Array[Byte] = { 
    val countBytes = Varint.encodeLong(binaryRecords.length.toLong) 
    val sizeBytes = Varint.encodeLong(binaryRecords.foldLeft(0)(_+_.length).toLong) 

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]() 

    buff.append(countBytes:_*) 
    buff.append(sizeBytes:_*) 
    binaryRecords.foreach { rec => 
     buff.append(rec:_*) 
    } 
    buff.append(syncMarker:_*) 

    buff.toArray 
    } 

    def encodeBlock[T](schema: Schema, records: Seq[GenericRecord], syncMarker: Array[Byte]): Array[Byte] = { 
    //block(records.map(encodeRecord(schema, _)), syncMarker) 
    val writer = new GenericDatumWriter[GenericRecord](schema) 
    val out = new ByteArrayOutputStream() 
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null) 
    records.foreach(record => writer.write(record, binaryEncoder)) 
    binaryEncoder.flush() 
    val flattenedRecords = out.toByteArray 
    out.close() 

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]() 

    val countBytes = Varint.encodeLong(records.length.toLong) 
    val sizeBytes = Varint.encodeLong(flattenedRecords.length.toLong) 

    buff.append(countBytes:_*) 
    buff.append(sizeBytes:_*) 
    buff.append(flattenedRecords:_*) 
    buff.append(syncMarker:_*) 

    buff.toArray 
    } 

    def encodeRecord[R <: GeneratedMessage with com.nitro.scalaAvro.runtime.Message[R]: ClassTag](
     entity: R 
): Array[Byte] = 
    encodeRecord(entity.companion.schema, entity.toMutable) 

    def encodeRecord(schema: Schema, record: GenericRecord): Array[Byte] = { 
    val writer = new GenericDatumWriter[GenericRecord](schema) 
    val out = new ByteArrayOutputStream() 
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null) 
    writer.write(record, binaryEncoder) 
    binaryEncoder.flush() 
    val bytes = out.toByteArray 
    out.close() 
    bytes 
    } 
} 

/** 
    * Encoding of integers with variable-length encoding. 
    * 
    * The avro specification uses a variable length encoding for integers and longs. 
    * If the most significant bit in a integer or long byte is 0 then it knows that no 
    * more bytes are needed, if the most significant bit is 1 then it knows that at least one 
    * more byte is needed. In signed ints and longs the most significant bit is traditionally 
    * used to represent the sign of the integer or long, but for us it's used to encode whether 
    * more bytes are needed. To get around this limitation we zig-zag through whole numbers such that 
    * negatives are odd numbers and positives are even numbers: 
    * 
    * i.e. -1, -2, -3 would be encoded as 1, 3, 5, and so on 
    * while 1, 2, 3 would be encoded as 2, 4, 6, and so on. 
    * 
    * More information is available in the avro specification here: 
    * @see http://lucene.apache.org/core/3_5_0/fileformats.html#VInt 
    *  https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types 
    */ 
object Varint { 

    import scala.collection.mutable 

    def encodeLong(longVal: Long): Array[Byte] = { 
    val buff = new ArrayBuffer[Byte]() 
    Varint.zigZagSignedLong(longVal, buff) 
    buff.toArray[Byte] 
    } 

    def encodeInt(intVal: Int): Array[Byte] = { 
    val buff = new ArrayBuffer[Byte]() 
    Varint.zigZagSignedInt(intVal, buff) 
    buff.toArray[Byte] 
    } 

    def zigZagSignedLong[T <: mutable.Buffer[Byte]](x: Long, dest: T): Unit = { 
    // sign to even/odd mapping: http://code.google.com/apis/protocolbuffers/docs/encoding.html#types 
    writeUnsignedLong((x << 1)^(x >> 63), dest) 
    } 

    def writeUnsignedLong[T <: mutable.Buffer[Byte]](v: Long, dest: T): Unit = { 
    var x = v 
    while ((x & 0xFFFFFFFFFFFFFF80L) != 0L) { 
     dest += ((x & 0x7F) | 0x80).toByte 
     x >>>= 7 
    } 
    dest += (x & 0x7F).toByte 
    } 

    def zigZagSignedInt[T <: mutable.Buffer[Byte]](x: Int, dest: T): Unit = { 
    writeUnsignedInt((x << 1)^(x >> 31), dest) 
    } 

    def writeUnsignedInt[T <: mutable.Buffer[Byte]](v: Int, dest: T): Unit = { 
    var x = v 
    while ((x & 0xFFFFF80) != 0L) { 
     dest += ((x & 0x7F) | 0x80).toByte 
     x >>>= 7 
    } 
    dest += (x & 0x7F).toByte 
    } 
} 
Problemi correlati