2015-03-27 17 views
6

Questo è un esempio di codice di lavoro:org.apache.spark.SparkException: Task non serializzabile

JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap); 
messages.print(); 
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
    @Override 
    public String call(Tuple2<String, String> tuple2) { 
     return tuple2._2(); 
    } 
}); 

ottengo l'errore qui sotto:

ERROR: 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) 
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140) 
    at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46) 
+1

Beh, se funziona è grande :). In caso contrario, è possibile attivare il debug della serializzazione Java con '-Dsun.io.serialization.extendedDebugInfo = true'. –

+1

Grazie, non va bene, ci ho provato. JavaDStream linee = messages.map (nuova funzione , String>() { @Override chiamata public String (Tuple2 tuple2) { ritorno tuple2._2(); } }); Questa riga di problemi di codice. –

+0

Abbastanza sicuro che questo codice sia Java e non Scala (vale a dire il tag) – SparkleGoat

risposta

14

Dal momento che si sta definendo la vostra funzione mappa utilizzando una classe interna anonima, la classe contenente deve anche essere serializzabile. Definisci la tua funzione mappa come una classe separata o rendila una classe interiore statica. Dalla documentazione di Java (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

+0

Grazie mille! –

+0

Felice di averlo aiutato! Si prega di accettare la risposta se lo ha fatto – InPursuit

+0

Spark sta cercando di serializzare l'oggetto passato alla mappa ma non può serializzarlo perché non implementa Serializable? Perché Spark sta facendo serializzazione? E se definiamo la funzione mappa come una classe separata, dobbiamo renderla anche serializzabile? – Johan

2

solo fornendo il codice di esempio:

JavaDStream<String> lines = messages.map(mapFunc); 

dichiarare la classe interna come una variabile statica:

static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() { 
    @Override 
    public String call(Tuple2<String, String> tuple2) { 
     return tuple2._2(); 
    } 
} 
Problemi correlati