2015-04-15 20 views
9

Ho riscontrato un problema con il tentativo di analizzare JSON nel mio lavoro spark. Sto usando spark 1.1.0, json4s e Cassandra Spark Connector. L'eccezione generata è:Spark eccezione non serializzabile durante l'analisi di JSON con json4s

java.io.NotSerializableException: org.json4s.DefaultFormats

esaminando l'oggetto associato DefaultFormats, e con questo stack domanda, è chiaro che DefaultFormats non possono essere serializzati. La domanda ora è cosa fare.

Posso vedere questo ticket apparentemente risolto questo problema nella base di codice di scintilla, aggiungendo la parola chiave transitoria, ma non sono sicuro esattamente come o dove applicarlo al mio caso. La soluzione per istanziare solo la classe DefaultFormats sugli executors, per evitare la serializzazione tutti insieme? Esiste un'altra libreria di analisi JSON per scala/scintilla utilizzata dalle persone? Inizialmente ho provato a usare jackson da solo, ma ho riscontrato alcuni errori con annotazioni che non sono stato in grado di risolvere facilmente e json4s ha funzionato immediatamente. Ecco il mio codice:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 
implicit val formats = DefaultFormats 

val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y) 

Eseguo l'analisi json nella funzione checkUa. Ho provato a fare il conteggio pigro, nella speranza che ritardasse l'esecuzione in qualche modo, ma non ha avuto alcun effetto. Forse spostando il valore implicito all'interno di checkUA? Qualche consiglio molto apprezzato.

risposta

12

Questo ha già risposto nel an open ticket with json4s. La soluzione è mettere la dichiarazione implicit all'interno della funzione

val count = rdd 
       .map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)}) 
       .reduce((x, y) => x + y) 
+0

Grazie per si risponde, ha funzionato come un fascino. Ecco un'altra domanda sulla scintilla JSON4s che ho ... bloccato di nuovo. http://stackoverflow.com/questions/29666487/json4s-cant-find-constructor-w-spark – worker1138

+0

Questo non sembra funzionare per me: https://stackoverflow.com/questions/48454611/spark-using- json4s-has-serialization-fail – pferrel

2

Ho avuto lo stesso errore quando metto la dichiarazione implicit val formats = ... all'interno del metodo che contiene l'analisi, invece di dichiarare sulla classe (oggetto).

Quindi questo getterebbe un errore:

object Application { 

    //... Lots of other code here, which eventually calls 
    // setupStream(...) 

    def setupStream(streamingContext: StreamingContext, 
          brokers: String, 
          topologyTopicName: String) = { 
    implicit val formats = DefaultFormats 
    _createDStream(streamingContext, brokers, topologyTopicName) 
     // Remove the message key, which is always null in our case 
     .map(_._2) 
     .map((json: String) => parse(json).camelizeKeys 
     .extract[Record[TopologyMetadata, Unused]]) 
     .print() 
} 

Ma questo andrebbe bene:

object Application { 

    implicit val formats = DefaultFormats 

    //... Lots of other code here, which eventually calls 
    // setupStream(...) 

    def setupStream(streamingContext: StreamingContext, 
          brokers: String, 
          topologyTopicName: String) = { 
    _createDStream(streamingContext, brokers, topologyTopicName) 
     // Remove the message key, which is always null in our case 
     .map(_._2) 
     .map((json: String) => parse(json).camelizeKeys 
     .extract[Record[TopologyMetadata, Unused]]) 
     .print() 
} 
+0

Puoi condividere il tuo codice? – worker1138

+0

Certo, ho modificato la mia risposta con qualche esempio di codice –

+0

Questo non sembra funzionare per me: https://stackoverflow.com/questions/48454611/spark-using-json4s-has-serialization-fails – pferrel

Problemi correlati