2015-10-02 20 views
16

Sono nuovo di Scala. Come mai la funzione "mappa" non è serializzabile? Come renderlo serializzabile? Ad esempio, se il mio codice è come qui di seguito:La mappa non può essere serializzabile in scala?

val data = sc.parallelize(List(1,4,3,5,2,3,5)) 

def myfunc(iter: Iterator[Int]) : Iterator[Int] = { 
    val lst = List(("a", 1),("b", 2),("c",3), ("a",2)) 
    var res = List[Int]() 
    while (iter.hasNext) { 
    val cur = iter.next 
    val a = lst.groupBy(x => x._1).mapValues(_.size) 
    //val b= a.map(x => x._2) 
    res = res ::: List(cur) 
    } 
    res.iterator 
} 

data.mapPartitions(myfunc).collect 

Se io commento dalla riga

val b= a.map(x => x._2) 

Il codice restituisce un'eccezione:

org.apache.spark.SparkException: Task not serializable 
Caused by: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2 
Serialization stack: 
    - object not serializable (class: scala.collection.immutable.MapLike$$anon$2, value: Map(1 -> 3)) 
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: a, type: interface scala.collection.immutable.Map) 

Grazie mille.

+0

Per quanto posso dire, non è riproducibile su Spark 1.2.0 - 1.5.0. Potrebbe fornire alcuni dettagli di configurazione? Come si esegue questo codice? – zero323

+0

Ciao zero323, ho eseguito questo codice direttamente nella shell di Scala con Spark 1.5. Ho anche eseguito il codice in Scala Shell di Spark 1.0.1 e lo stesso problema esiste. – Carter

+1

Sospetto che questo non sia il vero codice che dà l'errore? Il tuo 'lst' è davvero una semplice lista nel codice reale? O un altro RDD? –

risposta

37

E 'ben noto bug Scala: https://issues.scala-lang.org/browse/SI-7005 Mappa # mapValues ​​non è serializzabile

abbiamo questo problema nelle nostre applicazioni Spark, map(identity) risolve il problema

rdd.groupBy(_.segment).mapValues(v => ...).map(identity) 
+1

Questa era la mia ipotesi, ma [secondo OP non risolve il problema] (http://stackoverflow.com/questions/32900862/map-can-not- be-serializzabile-in-scala? noredirect = 1 # comment53635061_32900862). – zero323

+2

Questo ha funzionato per me. Grazie! Non l'avrei mai capito. Perché funziona? –

+0

@Mattew Perché 'mapValues' è pigro (https://stackoverflow.com/questions/39474314/is-scala-mapvalues-lazy) – Federico

1

Hai provato l'esecuzione di questo stesso codice in un applicazione? Sospetto che questo sia un problema con la scintilla. Se si vuole farlo funzionare nel guscio scintilla allora si potrebbe provare avvolgendo la definizione di myfunc e la sua applicazione tra parentesi graffe in questo modo:

val data = sc.parallelize(List(1,4,3,5,2,3,5)) 

val result = { 
    def myfunc(iter: Iterator[Int]) : Iterator[Int] = { 
    val lst = List(("a", 1),("b", 2),("c",3), ("a",2)) 
    var res = List[Int]() 
    while (iter.hasNext) { 
     val cur = iter.next 
     val a = lst.groupBy(x => x._1).mapValues(_.size) 
     val b= a.map(x => x._2) 
     res = res ::: List(cur) 
    } 
    res.iterator 
    } 
    data.mapPartitions(myfunc).collect 
} 
+0

Ho provato questo codice solo in Spark Shell. Sembra che sia il problema con Shell. – Carter

3

L'effettiva attuazione della funzione mapValues ​​è fornito qui sotto e come si può vedere non è serializzabile e crea solo una vista non una corretta esistenza dei dati e quindi si sta ottenendo questo errore. I mapValues ​​della situazione hanno molti vantaggi.

protected class MappedValues[C](f: B => C) extends AbstractMap[A, C] with DefaultMap[A, C] { 
override def foreach[D](g: ((A, C)) => D): Unit = for ((k, v) <- self) g((k, f(v))) 
def iterator = for ((k, v) <- self.iterator) yield (k, f(v)) 
override def size = self.size 
override def contains(key: A) = self.contains(key) 
def get(key: A) = self.get(key).map(f) 
}