2016-01-15 13 views
15

Voglio creare un nuovo mongodb RDD ogni volta che entro foreachRDD. Tuttavia ho problemi di serializzazione:Spark Streaming: foreachRDD aggiorna il mio mongo RDD

mydstream 
    .foreachRDD(rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     // ssc is my StreamingContext 
     val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }) 

Questo mi darà un errore:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 

Qualche idea?

+0

'SparkContext' non è serializzabile, quindi non è possibile utilizzare all'interno di alcuna trasformazione o metodi di azione, è necessario utilizzare solo nella classe di driver. – Shankar

+0

c'è qualche ragione specifica per cui stai convertendo la lista in rdd all'interno del metodo foreachRDD? – Shankar

risposta

7

Si potrebbe provare ad usare rdd.context che restituisce uno SparkContext o uno SparkStreamingContext (se RDD è un DSTREAM).

mydstream foreachRDD { rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) }) 

In realtà, sembra che RDD ha anche un metodo .sparkContext. Onestamente non conosco la differenza, forse sono alias (?).

2

Nella mia comprensione è necessario aggiungere se si dispone di un oggetto "non serializzabile", è necessario passarlo attraverso foreachPartition in modo da poter effettuare una connessione al database su ciascun nodo prima di eseguire l'elaborazione.

mydstream.foreachRDD(rdd => { 
     rdd.foreachPartition{ 
      val mongoClient = MongoClient("localhost", 27017) 
      val db = mongoClient(mongoDatabase) 
      val coll = db(mongoCollection) 
      // ssc is my StreamingContext 
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }}) 
+0

non funzionerà, dato che ssc non è serializzabile. –

+0

Puoi provare a creare il tuo ssc all'interno del foreachRDD prima di rdd.foreachPartition 'val ssc = StreamingContext.getOrCreate (checkpointdirectory, functionToCreateContext _)' – Rami

Problemi correlati