2015-09-28 10 views
6

Quando si esegue sparkJob su un cluster dopo una determinata dimensione di dati (~ 2,5 gb) Ricevo "Job annullato perché SparkContext è stato chiuso" o "executor perso ". Quando guardo il filo gui, vedo che il lavoro che è stato ucciso ha avuto successo. Non ci sono problemi durante l'esecuzione su dati che è 500mb. Stavo cercando una soluzione e ho scoperto che: - "sembra che il filato uccida alcuni degli esecutori perché richiedono più memoria del previsto"."sparkContext è stato arrestato" durante l'esecuzione di una scintilla su un set di dati di grandi dimensioni

Qualche suggerimento su come eseguirne il debug?

comando che presento il mio lavoro scintilla con:

/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6 --class sparkTesting.Runner --master yarn-client myJar.jar jarArguments 

e le impostazioni sparkContext

val sparkConf = (new SparkConf() 
    .set("spark.driver.maxResultSize", "21g") 
    .set("spark.akka.frameSize", "2011") 
    .set("spark.eventLog.enabled", "true") 
    .set("spark.eventLog.enabled", "true") 
    .set("spark.eventLog.dir", configVar.sparkLogDir) 
    ) 

codice semplificato che non riesce assomiglia a quello

val hc = new org.apache.spark.sql.hive.HiveContext(sc) 
val broadcastParser = sc.broadcast(new Parser()) 

val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles) 
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser)) 

val allWords= featuresRdd 
    .flatMap(line => line.split(" ")) 
    .count 

val wordQuantiles= featuresRdd 
    .flatMap(line => line.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) 
    .map(pair => (pair._2 , pair._2)) 
    .reduceByKey(_+_) 
    .sortBy(_._1) 
    .collect 
    .scanLeft((0,0.0)) ((res,add) => (add._1, res._2+add._2)) 
    .map(entry => (entry._1,entry._2/allWords)) 

val dictionary = featuresRdd 
    .flatMap(line => line.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) // here I have Rdd of word,count tuples 
    .filter(_._2 >= moreThan) 
    .filter(_._2 <= lessThan) 
    .filter(_._1.trim!=("")) 
    .map(_._1) 
    .zipWithIndex 
    .collect 
    .toMap 

e l'errore di stack

Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) 
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511) 
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435) 
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715) 
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185) 
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714) 
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) 
at org.apache.spark.rdd.RDD.count(RDD.scala:1121) 
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50) 
at sparkTesting.Runner$.main(Runner.scala:133) 
at sparkTesting.Runner.main(Runner.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
+4

Nella mia esperienza, questo è quasi sempre dovuto alle eccezioni di OOM. Prova a dare un'occhiata ai file di registro sulle macchine del singolo esecutore. –

+1

Vorrei stampare una traccia dal tuo lavoro e monitorare la dimensione dell'heap JVM con alcuni strumenti di utilità Java: jstat, jstatd, jconsole ... per saperne di più sulla limitazione. Se hai ancora la memoria fisica, puoi aumentare la dimensione della memoria JVM prima di avviare la tua app! Puoi ridimensionare le tue raccolte in base alla dimensione dell'heap ottimizzata. –

risposta

4

Trovato la risposta.

Il mio tavolo è stato salvato come file avro da 20 gb. Quando gli esecutori hanno cercato di aprirlo. Ognuno di loro ha dovuto caricare 20 GB in memoria. Risolto utilizzando csv anziché avro

1

I sintomi sono tipici di un errore OutOfMemory in una delle attività executor. Prova ad aumentare la memoria per executor quando lavori lauching. Vedi parametro --executor-memory di saprk-submit, spark-shell ecc. Il valore predefinito è 1G

1

Un'altra possibile causa dell'errore "SparkContext è arresto" è che si sta importando un file jar dopo aver valutato un altro codice. (Questo può accadere solo in Spark Notebook.)

Per risolvere il problema, sposta tutte le tue istruzioni :cp myjar.jar all'inizio del file.

Problemi correlati