2014-11-15 16 views

Sto usando hadoop 2.4.1 e Spark 1.1.0. Ho caricato un set di dati di revisione cibo per HDFS da here e poi ho usato il seguente codice per leggere il file e di processo che sul guscio scintilla:Perché spark-shell lancia ArrayIndexOutOfBoundsException durante la lettura di un file di grandi dimensioni da HDFS?

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

var path = "hdfs:///user/hduser/finefoods.txt" 
val conf = new Configuration 
conf.set("textinputformat.record.delimiter", "\n\n") 
var dataset = sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString) 
var datasetObj = dataset.map{ rowStr => rowStr.split("\n")}  
var tupleSet = datasetObj.map(strArr => strArr.map(elm => elm.split(": ")(1))).map(arr => (arr(0),arr(1),arr(4).toDouble)) 
tupleSet.groupBy(t => t._2) 

Quando eseguo l'ultima riga tupleSet.groupBy(t => t._2), il guscio scintilla tiri la seguente eccezione:

scala> tupleSet.groupBy(t => t._2).first() 
14/11/15 22:46:59 INFO spark.SparkContext: Starting job: first at <console>:28 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Registering RDD 11 (groupBy at <console>:28) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Got job 1 (first at <console>:28) with 1 output partitions (allowLocal=true) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Final stage: Stage 1(first at <console>:28) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 2) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Missing parents: List(Stage 2) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[11] at groupBy at <console>:28), which has no missing parents 
14/11/15 22:46:59 INFO storage.MemoryStore: ensureFreeSpace(3592) called with curMem=221261, maxMem=278302556 
14/11/15 22:46:59 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.5 KB, free 265.2 MB) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 2 (MappedRDD[11] at groupBy at <console>:28) 
14/11/15 22:46:59 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 3 tasks 
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, localhost, ANY, 1221 bytes) 
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 4, localhost, ANY, 1221 bytes) 
14/11/15 22:46:59 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:46:59 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 4) 
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs:// 
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs:// 
14/11/15 22:47:02 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 4) 
14/11/15 22:47:02 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 2.0 (TID 5, localhost, ANY, 1221 bytes) 
14/11/15 22:47:02 INFO executor.Executor: Running task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 INFO rdd.NewHadoopRDD: Input split: hdfs:// 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 

14/11/15 22:47:02 ERROR scheduler.TaskSetManager: Task 1 in stage 2.0 failed 1 times; aborting job 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Stage 2 was cancelled 
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:47:02 INFO scheduler.DAGScheduler: Failed to run first at <console>:28 
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally) 
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 2.0 (TID 5, localhost): TaskKilled (killed intentionally) 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Ma quando io uso dataset manichino come il seguente, funziona bene:

var tupleSet = sc.parallelize(List(

Qualsiasi idea w hy?



Probabilmente c'è una voce nel set di dati che non segue il formato e quindi: elm.split(": ")(1) fallisce, perché non c'è alcun elemento in quell'indice.

È possibile evitare questo errore controllando i risultati della divisione prima di accedere all'indice (1). Un modo di fare che potrebbe essere qualcosa di simile:

var tupleSet = datasetObj.map(elem => elm.split(": ")).collect{case x if (x.length>1) x(1)} 

Una nota: I suoi esempi non sembrano corrispondere la pipeline di analisi nel codice. Non contengono i token ":".

Poiché le trasformazioni sono pigre, Spark non ti dirà molto del tuo set di dati di input (e potresti non accorgertene) solo fino all'esecuzione di un'azione come groupBy().


Potrebbe anche essere dovuto a righe vuote/vuote nel set di dati. E stai applicando una funzione di divisione sui dati. In tal caso, filtra le linee vuote.

esempio: myrdd.filter (. _ Non vuoto) .map (...)


Ho avuto un problema simile quando ero Conversione di un dati di log in utilizzando dataframe pySpark.

Quando una voce di registro non è valida, ho restituito un valore nullo anziché un'istanza di riga. Prima di convertire in dataframe, ho filtrato questi valori nulli. Ma, ancora, ho avuto il problema sopra. Infine, l'errore è andato via quando ho restituito una riga con valori nulli invece di un singolo valore nullo.

Pseudo codice qui sotto:

ha funzionato:

rdd = Parse log (log lines to Rows if valid else None) 
filtered_rdd = rdd.filter(lambda x:x!=None) 
logs = sqlContext.inferSchema(filtered_rdd) 


rdd = Parse log (log lines to Rows if valid else Row(None,None,...)) 
logs = sqlContext.inferSchema(rdd) 
filtered_rdd = logs.filter(logs['id'].isNotNull()) 
Problemi correlati