2015-06-02 30 views
5

La serializzazione Elasticsearch/Spark non sembra funzionare correttamente con i tipi nidificati.Serializzazione Elasticsearch-Spark che non funziona con le classi interne

Ad esempio:

public class Foo implements Serializable { 
    private List<Bar> bars = new ArrayList<Bar>(); 
    // getters and setters 

    public static class Bar implements Serializable { 
    } 
} 

List<Foo> foos = new ArrayList<Foo>(); 
foos.add(new Foo()); 
// Note: Foo object does not contain nested Bar instances 

SparkConf sc = new SparkConf(); // 
sc.setMaster("local"); 
sc.setAppName("spark.app.name"); 
sc.set("spark.serializer", KryoSerializer.class.getName()); 
JavaSparkContext jsc = new JavaSparkContext(sc); 
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(foos)); 
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME); 

Il codice di cui sopra sopra opere e documenti di tipo Foo saranno indicizzati all'interno elasticsearch.

Il problema sorge quando la lista bars in un oggetto Foo non è vuoto, per esempio:

Foo = new Foo(); 
Bar = new Foo.Bar(); 
foo.getBars().add(bar); 

Poi, durante l'indicizzazione di elasticsearch, la seguente eccezione viene generata:

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
Cannot handle type [Bar] within type [class Foo], instance [Bar ...]] 
within instance [[email protected]] 
using writer [[email protected]] 
at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:63) 
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71) 
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58) 
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148) 
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:68) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:68) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
at org.apache.spark.scheduler.Task.run(Task.scala:64) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
at java.lang.Thread.run(Unknown Source) 

Questi sono le dipendenze Maven rilevanti

<dependency> 
    <groupId>com.sksamuel.elastic4s</groupId> 
    <artifactId>elastic4s_2.11</artifactId> 
    <version>1.5.5</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.11</artifactId> 
    <version>1.3.1</version> 
</dependency> 

<dependency> 
    <groupId>org.elasticsearch</groupId> 
    <artifactId>elasticsearch-hadoop-cascading</artifactId> 
    <version>2.1.0.Beta4</version> 
</dependency> 

<dependency> 
    <groupId>com.fasterxml.jackson.core</groupId> 
    <artifactId>jackson-databind</artifactId> 
    <version>2.1.3</version> 
</dependency> 

<dependency> 
    <groupId>org.elasticsearch</groupId> 
    <artifactId>elasticsearch-spark_2.10</artifactId> 
    <version>2.1.0.Beta4</version> 
</dependency> 

<dependency> 
    <groupId>org.scala-lang</groupId> 
    <artifactId>scala-xml</artifactId> 
    <version>2.11.0-M4</version> 
</dependency> 

Qual è il modo corretto di indicizzare quando si usano tipi annidati con ElasticSearch e Spark?

Grazie

+0

è necessario serializzare gli oggetti Foo e Bar! – eliasah

+0

attrezzi serializzabili? – user1052610

+0

In Java sì, poiché è un'interfaccia. – eliasah

risposta

0

Guardando lo ScalaValueWriter & codice JdkValueWriter possiamo vedere che solo alcuni tipi sono supportati direttamente. Molto probabilmente la classe interna non è un JavaBean o altro tipo supportato.

+0

Grazie Holden, ma qual è la soluzione allora? Anche se Bar non è una classe interiore ma una classe regolare, non può essere aggiunta come membro di Foo. La stessa eccezione è lanciata. In altre parole, come posso definire la barra in modo che possa essere aggiunta a Foo? – user1052610

+0

Rendilo un bean Java :) – Holden

+0

Oppure una classe di caso – Holden

0

Un giorno ScalaValueWriter & JdkValueWriter sarà eventualmente supportare i tipi definiti dall'utente (come Bar nel nostro esempio), altro che semplici tipi Java come String, int, ecc

Nel frattempo, v'è la seguente soluzione. Invece di avere Foo esporre un elenco di oggetti Bar, trasformare internamente l'elenco in uno Map<String, Object> ed esporlo.

Qualcosa di simile a questo:

private List<Map<String, Object>> bars= new ArrayList<Map<String, Object>>(); 

public List<Map<String, Object>> getBars() { 
    return bars; 
} 

public void setBars(List<Bar> bars) { 
    for (Bar bar: bars){ 
     this.bars.add(bar.getAsMap()); 
    } 
} 
3

Una soluzione potrebbe essere quella di costruire un JSON dall'oggetto che si sta cercando di salvare, usando per esempio Json4s. In questo caso il tuo RDD "JavaEsSpark" sarebbe un RDD di stringhe. Poi è sufficiente chiamare

JavaEsSpark.saveJsonToEs...

invece di

JavaEsSpark.saveToEs...

Questa soluzione mi ha aiutato a salvare innumerevoli ore cercando di capire un modo per serializzare mappe nidificate.

Problemi correlati