2016-01-22 9 views
16

Quando si esegue il codice di analisi con un set di dati da 1 GB, l'operazione viene completata senza errori. Ma, quando cerco 25 GB di dati alla volta, ottengo sotto errori. Sto cercando di capire come posso evitare i fallimenti sotto. Felice di ascoltare qualsiasi suggerimento o idea.FetchFailedException o MetadataFetchFailedException durante l'elaborazione di un grande set di dati

errori differenti,

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 

org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx 

org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094} 

grappolo d'affitto:

Filato: 8 nodi
nuclei totali: 64
memoria: 500 GB
Spark Versione: 1.5

Spark submit statement :

spark-submit --master yarn-cluster \ 
         --conf spark.dynamicAllocation.enabled=true \ 
         --conf spark.shuffle.service.enabled=true \ 
         --executor-memory 4g \ 
         --driver-memory 16g \ 
         --num-executors 50 \ 
         --deploy-mode cluster \ 
         --executor-cores 1 \ 
         --class my.parser \ 
         myparser.jar \ 
         -input xxx \ 
         -output xxxx \ 

Uno stack trace:

at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) 
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456) 
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456) 
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183) 
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47) 
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:88) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

risposta

48

Questo errore è quasi garantito per essere causato da problemi di memoria sui vostri esecutori. Posso pensare ad un paio di modi per affrontare questi tipi di problemi.

1) È possibile provare a eseguire più partizioni (fare un repartition sul proprio dataframe). I problemi di memoria si presentano in genere quando una o più partizioni contengono più dati di quelli che si adattano alla memoria.

2) Mi accorgo che non è stato impostato in modo esplicito spark.yarn.executor.memoryOverhead, quindi verrà impostato automaticamente su max(386, 0.10* executorMemory) che nel tuo caso sarà 400 MB. Mi sembra poco. Vorrei provare ad aumentare 1GB (si noti che se si aumenta memoryOverhead a 1 GB, è necessario ridurre --executor-memory a 3 GB)

3) Cercare i file di registro sui nodi in errore. Vuoi cercare il testo "Killing container". Se vedi che il testo "va oltre i limiti di memoria fisica", aumentando memoryOverhead, nella mia esperienza, risolverò il problema.

+0

Il numero 2) conta anche in modalità standalone. Se sì, come possiamo impostarlo. Non riesco a trovare var simili in modalità standalone. – Laeeq

3

Ho anche avuto dei buoni risultati aumentando il timeout Spark spark.network.timeout ad un valore più grande come 800. Il valore predefinito di 120 secondi causerà il timeout di molti dei tuoi esecutori quando il carico è elevato.

Problemi correlati