2014-11-15 16 views
7

Sto usando hadoop 2.4.1 e Spark 1.1.0. Ho caricato un set di dati di revisione cibo per HDFS da here e poi ho usato il seguente codice per leggere il file e di processo che sul guscio scintilla:Perché spark-shell lancia ArrayIndexOutOfBoundsException durante la lettura di un file di grandi dimensioni da HDFS?

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

var path = "hdfs:///user/hduser/finefoods.txt" 
val conf = new Configuration 
conf.set("textinputformat.record.delimiter", "\n\n") 
var dataset = sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString) 
var datasetObj = dataset.map{ rowStr => rowStr.split("\n")}  
var tupleSet = datasetObj.map(strArr => strArr.map(elm => elm.split(": ")(1))).map(arr => (arr(0),arr(1),arr(4).toDouble)) 
tupleSet.groupBy(t => t._2) 

Quando eseguo l'ultima riga tupleSet.groupBy(t => t._2), il guscio scintilla tiri la seguente eccezione:

scala> tupleSet.groupBy(t => t._2).first() 
14/11/15 22:46:59 INFO spark.SparkContext: Starting job: first at <console>:28 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Registering RDD 11 (groupBy at <console>:28) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Got job 1 (first at <console>:28) with 1 output partitions (allowLocal=true) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Final stage: Stage 1(first at <console>:28) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 2) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Missing parents: List(Stage 2) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[11] at groupBy at <console>:28), which has no missing parents 
14/11/15 22:46:59 INFO storage.MemoryStore: ensureFreeSpace(3592) called with curMem=221261, maxMem=278302556 
14/11/15 22:46:59 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.5 KB, free 265.2 MB) 
14/11/15 22:46:59 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 2 (MappedRDD[11] at groupBy at <console>:28) 
14/11/15 22:46:59 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 3 tasks 
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, localhost, ANY, 1221 bytes) 
14/11/15 22:46:59 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 4, localhost, ANY, 1221 bytes) 
14/11/15 22:46:59 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:46:59 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 4) 
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:0+134217728 
14/11/15 22:46:59 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:134217728+134217728 
14/11/15 22:47:02 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 4) 
java.lang.ArrayIndexOutOfBoundsException 
14/11/15 22:47:02 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 2.0 (TID 5, localhost, ANY, 1221 bytes) 
14/11/15 22:47:02 INFO executor.Executor: Running task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 INFO rdd.NewHadoopRDD: Input split: hdfs://10.12.0.245/user/hduser/finefoods.txt:268435456+102361028 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 

14/11/15 22:47:02 ERROR scheduler.TaskSetManager: Task 1 in stage 2.0 failed 1 times; aborting job 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Stage 2 was cancelled 
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 INFO executor.Executor: Executor is trying to kill task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:47:02 INFO scheduler.DAGScheduler: Failed to run first at <console>:28 
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 0.0 in stage 2.0 (TID 3) 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally) 
14/11/15 22:47:02 INFO executor.Executor: Executor killed task 2.0 in stage 2.0 (TID 5) 
14/11/15 22:47:02 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 2.0 (TID 5, localhost): TaskKilled (killed intentionally) 
14/11/15 22:47:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Ma quando io uso dataset manichino come il seguente, funziona bene:

var tupleSet = sc.parallelize(List(
("B001E4KFG0","A3SGXH7AUHU8GW",3.0), 
("B001E4KFG1","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG2","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG3","A3SGXH7AUHU8GW",4.0), 
("B001E4KFG4","A3SGXH7AUHU8GW",5.0), 
("B001E4KFG5","A3SGXH7AUHU8GW",5.0), 
("B001E4KFG0","bbb",5.0) 
)) 

Qualsiasi idea w hy?

risposta

9

Probabilmente c'è una voce nel set di dati che non segue il formato e quindi: elm.split(": ")(1) fallisce, perché non c'è alcun elemento in quell'indice.

È possibile evitare questo errore controllando i risultati della divisione prima di accedere all'indice (1). Un modo di fare che potrebbe essere qualcosa di simile:

var tupleSet = datasetObj.map(elem => elm.split(": ")).collect{case x if (x.length>1) x(1)} 

Una nota: I suoi esempi non sembrano corrispondere la pipeline di analisi nel codice. Non contengono i token ":".

Poiché le trasformazioni sono pigre, Spark non ti dirà molto del tuo set di dati di input (e potresti non accorgertene) solo fino all'esecuzione di un'azione come groupBy().

0

Potrebbe anche essere dovuto a righe vuote/vuote nel set di dati. E stai applicando una funzione di divisione sui dati. In tal caso, filtra le linee vuote.

esempio: myrdd.filter (. _ Non vuoto) .map (...)

0

Ho avuto un problema simile quando ero Conversione di un dati di log in utilizzando dataframe pySpark.

Quando una voce di registro non è valida, ho restituito un valore nullo anziché un'istanza di riga. Prima di convertire in dataframe, ho filtrato questi valori nulli. Ma, ancora, ho avuto il problema sopra. Infine, l'errore è andato via quando ho restituito una riga con valori nulli invece di un singolo valore nullo.

Pseudo codice qui sotto:

ha funzionato:

rdd = Parse log (log lines to Rows if valid else None) 
filtered_rdd = rdd.filter(lambda x:x!=None) 
logs = sqlContext.inferSchema(filtered_rdd) 

Lavorato:

rdd = Parse log (log lines to Rows if valid else Row(None,None,...)) 
logs = sqlContext.inferSchema(rdd) 
filtered_rdd = logs.filter(logs['id'].isNotNull()) 
Problemi correlati