2014-05-21 4 views
5

sessionIdList è di tipo:NullPointerException in Scala Spark, sembra essere causato da tipo di raccolta?

scala> sessionIdList res19: org.apache.spark.rdd.RDD [String] = MappedRDD [17] distinti in: 30

Quando si tenta di eseguire qui sotto code :

val x = sc.parallelize(List(1,2,3)) 
val cartesianComp = x.cartesian(x).map(x => (x)) 

val kDistanceNeighbourhood = sessionIdList.map(s => { 
    cartesianComp.filter(v => v != null) 
}) 

kDistanceNeighbourhood.take(1) 

ricevo un'eccezione:

14/05/21 16:20:46 ERROR Executor: Exception in task ID 80 
java.lang.NullPointerException 
     at org.apache.spark.rdd.RDD.filter(RDD.scala:261) 
     at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38) 
     at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 

Tuttavia se uso:

0.123.
val l = sc.parallelize(List("1","2")) 
val kDistanceNeighbourhood = l.map(s => {  
    cartesianComp.filter(v => v != null) 
}) 

kDistanceNeighbourhood.take(1) 

Poi viene visualizzato un'eccezione

La differenza tra i due frammenti di codice è che nel primo pezzo sessionIdList è di tipo:

res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30 

e in secondo frammento di "l" è di tipo

scala> l 
res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:12 

Perché si verifica questo errore?

Devo convertire sessionIdList in ParallelCollectionRDD per risolvere questo problema?

+0

Riesci a rendere il tuo codice autonomo? –

+0

@IvanVergiliev tutto il codice tranne un ParallelCollectionRDD popolato è incluso per ricreare l'eccezione. Non so come creare un popolato ParallelCollectionRDD –

risposta

9

Spark non supporta il nesting di RDD (vedere https://stackoverflow.com/a/14130534/590203 per un'altra occorrenza dello stesso problema), quindi non è possibile eseguire trasformazioni o azioni su RDD all'interno di altre operazioni RDD.

Nel primo caso, viene visualizzata una eccezione NullPointerException generata dall'operatore quando tenta di accedere a un oggetto SparkContext presente solo sul driver e non sugli operai.

Nel secondo caso, la mia impressione è che il lavoro è stato eseguito localmente sul conducente e ha funzionato per puro caso.

+0

è garantito che un NPE verrà lanciato se si utilizza l'annidamento RDD? La ragione per chiedere è che sto usando il metodo cartesiano e la funzione che è stata eseguita sull'RDD chiama altre funzioni che sono nidificate ma sembra funzionare correttamente. Sembra che questo problema si verifichi solo per operazioni RDD nidificate e non per chiamate di funzioni nidificate? –

+0

Non è garantito, ma penso che probabilmente dovrebbe essere al fine di impedire agli utenti di scrivere programmi che funzionano bene in modalità 'local' ma falliscono su un cluster, o che hanno successo o falliscono in base ai capricci di dove sono pianificate le attività. –

3

È una domanda ragionevole e l'ho sentito chiedere abbastanza volte. Proverò a provare a spiegare perché è vero, perché potrebbe essere d'aiuto.

Gli RDD nidificati eseguono sempre un'eccezione nella produzione. Le chiamate di funzioni nidificate come penso che le stiate descrivendo qui, se ciò significa chiamare un'operazione RDD all'interno di un'operazione RDD, causeranno anche errori di causa poiché è effettivamente la stessa cosa. (Gli RDD sono immutabili, quindi eseguire un'operazione RDD come una "mappa" equivale a creare un nuovo RDD.) La possibilità di creare RDD nidificati è una conseguenza necessaria del modo in cui un RDD è definito e del modo in cui Spark Application è impostare.

Un RDD è una raccolta distribuita di oggetti (denominati partizioni) che vivono su Spark Executor. Gli esecutori Spark non possono comunicare tra loro, solo con il driver Spark. Le operazioni RDD sono tutte calcolate in pezzi su queste partizioni. Poiché l'ambiente executor di RDD non è ricorsivo (ad esempio, è possibile configurare un driver Spark su un esecutore spark con sub executor), nessuno dei due può eseguire un RDD.

Nel programma, è stata creata una raccolta distribuita di partizioni di numeri interi. Quindi stai eseguendo un'operazione di mappatura. Quando il driver Spark vede un'operazione di mappatura, invia le istruzioni per eseguire il mapping agli esecutori, che eseguono la trasformazione su ogni partizione in parallelo.Ma la tua mappatura non può essere fatta, perché su ogni partizione stai cercando di chiamare "intero RDD" per eseguire un'altra operazione distribuita. Questo non può essere fatto, perché ogni partizione non ha accesso alle informazioni sulle altre partizioni, se così fosse, il calcolo non potrebbe essere eseguito in parallelo.

Cosa invece è possibile fare, poiché i dati necessari nella mappa sono probabilmente di dimensioni ridotte (poiché si sta facendo un filtro e il filtro non richiede alcuna informazione su sessionIdList) è innanzitutto necessario filtrare l'elenco ID sessione. Quindi raccogliere quella lista per il conducente. Quindi trasmetterlo agli esecutori, dove è possibile utilizzarlo sulla mappa. Se l'elenco sessionID è troppo grande, probabilmente dovrai fare un join.

Problemi correlati