2015-07-21 15 views
7

quando sto cercando di eseguire sul questa cartella che mi sta gettando ExecutorLostFailure ogniExecutorLostFailure errore durante l'esecuzione di un compito in Spark

Ciao io sono un principiante in Spark. Sto cercando di eseguire un lavoro su Spark 1.4.1 con 8 nodi slave con 11,7 GB di memoria ciascuno su disco da 3,2 GB. Sto eseguendo l'attività Spark da uno dei nodi slave (da 8 nodi) (quindi con 0,7 frazione di memoria di circa 4,8 gb disponibile solo su ciascun nodo) e utilizzando Mesos come Cluster Manager. Sto usando questa configurazione:

spark.master mesos://uc1f-bioinfocloud-vamp-m-1:5050 
spark.eventLog.enabled true 
spark.driver.memory 6g 
spark.storage.memoryFraction 0.7 
spark.core.connection.ack.wait.timeout 800 
spark.akka.frameSize 50 
spark.rdd.compress true 

Sto provando a fare funzionare Spark MLlib algoritmo Naive Bayes su una cartella circa 14 GB di dati. (Non c'è alcun problema quando sto eseguendo l'attività su una cartella da 6 GB) Sto leggendo questa cartella da Google Storage come RDD e dando 32 come parametro di partizione. (Ho provato ad aumentare anche la partizione). Quindi usa TF per creare un vettore di funzionalità e prevedere in base a quello. Ma quando sto provando a eseguirlo su questa cartella mi sta lanciando ExecutorLostFailure ogni volta. Ho provato diverse configurazioni ma nulla aiuta. Forse mi manca qualcosa di molto semplice ma non in grado di capirlo. Qualsiasi aiuto o suggerimento sarà molto prezioso.

Log è:

15/07/21 01:18:20 ERROR TaskSetManager: Task 3 in stage 2.0 failed 4 times; aborting job  
15/07/21 01:18:20 INFO TaskSchedulerImpl: Cancelling stage 2  
15/07/21 01:18:20 INFO TaskSchedulerImpl: Stage 2 was cancelled  
15/07/21 01:18:20 INFO DAGScheduler: ResultStage 2 (collect at /opt/work/V2ProcessRecords.py:213) failed in 28.966 s  
15/07/21 01:18:20 INFO DAGScheduler: Executor lost: 20150526-135628-3255597322-5050-1304-S8 (epoch 3)  
15/07/21 01:18:20 INFO BlockManagerMasterEndpoint: Trying to remove executor 20150526-135628-3255597322-5050-1304-S8 from BlockManagerMaster.  
15/07/21 01:18:20 INFO DAGScheduler: Job 2 failed: collect at /opt/work/V2ProcessRecords.py:213, took 29.013646 s  
Traceback (most recent call last):  
    File "/opt/work/V2ProcessRecords.py", line 213, in <module> 
    secondPassRDD = firstPassRDD.map(lambda (name, title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust, wclass): (str(name), title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust , "Yes" if ("PMC" + pmcId) in rddNIHGrant else ("No") , wclass)).collect()  
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 745, in collect  
    File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__  
    File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 4 times, most recent failure: Lost task 3.3 in stage 2.0 (TID 12, vamp-m-2.c.quantum-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost)  
Driver stacktrace:  
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) 
     at  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) 
     at  scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at  org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

15/07/21 01:18:20 INFO BlockManagerMaster: Removed 20150526-135628-3255597322-5050-1304-S8 successfully in removeExecutor 
15/07/21 01:18:20 INFO DAGScheduler: Host added was in lost list earlier:vamp-m-2.c.quantum-854.internal 
Jul 21, 2015 1:01:15 AM INFO: parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5 
15/07/21 01:18:20 INFO SparkContext: Invoking stop() from shutdown hook 



{"Event":"SparkListenerTaskStart","Stage ID":2,"Stage Attempt ID":0,"Task Info":{"Task ID":11,"Index":6,"Attempt":2,"Launch Time":1437616381852,"Executor ID":"20150526-135628-3255597322-5050-1304-S8","Host":"uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}} 

{ "eventi": "SparkListenerExecutorRemoved", "Timestamp": 1.437.616,389696 millions, "Esecutore ID": "20150526-135628-3255597322-5050-1304-S8", "rimosso Motivo ":" Esecutore smarrito "} {" Evento ":" SparkListenerTaskEnd "," ID stage ": 2," ID tentativo stage ": 0," Tipo di attività ":" ResultTask "," Motivo fine attività ": {" Motivo ":" ExecutorLostFailure "," Executor ID ":" 20150526-135628-3255597322-5050-1304-S8 "}," Info attività ": {" ID attività ": 11," Indice ": 6," Tentativo ": 2, "Ora di avvio": 1437616381852, "ID Executor": "20150526-135628-3255597322-5050-1304-S8", "Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854 .internal "," Locality ":" PROCESS_LOCAL "," Speculative ": false," Ottenere risultato ": 0," F inish Time ": 1437616389697," Failed ": true," Accumulables ": []}} {" Event ":" SparkListenerExecutorAdded "," Timestamp ": 1437616389707," Executor ID ":" 20150526-135628-3255597322-5050- 1304-S8 "," Informazioni Executor ": {" Host ":" uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal "," Total Cores ": 1," Log Url ": { }}} {"Evento": "SparkListenerTaskStart", "ID stage": 2, "ID tentativo stage": 0, "Informazioni attività": {"ID compito": 12, "Indice": 6, "Tentativo" : 3, "Ora di avvio": 1437616389702, "ID Executor": "20150526-135628-3255597322-5050-1304-S8", "Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device- 854. interno "," Località ":" PROCESS_LOCAL "," Speculativo ": falso," Ottenere tempo di risultato ": 0," Ora di fine ": 0," Non riuscito ": falso," Accumulabili ": []}} { "Event": "SparkListenerExecutorRemoved", "Timestamp": 1437616397743, "Executor ID": "20150526-135628-3255597322-5050-1304-S8", "Removed Reason": "Lost executor"} {"Evento": " SparkListenerTaskEnd "" ID stage ": 2," ID tentativo stage ": 0," Tipo di attività ":" ResultTask "," Motivo finale attività ": {" Motivo ":" ExecutorLostFailure "," ID esecutore ":" 20150526-135628-3255597322- 5050-1304-S8 "}," Info attività ": {" ID attività ": 12," Indice ": 6," Tentativo ": 3," Ora di avvio ": 1437616389702," ID Executor ":" 20150526-135628- 3255597322-5050-1304-S8" , "host": "-bioinfocloud-tomaia-m-2.c.quantum-dispositivo-854.internal uc1f", "Località": "PROCESS_LOCAL", "speculativo": false," Ottenere tempo risultato ": 0," Ora di fine ": 1437616397743," Non riuscito ": vero," Accumulabili ": []}} {" Evento ":" SparkListenerStageCompleted "," Informazioni sullo stage ": {" ID stage ": 2 , "ID tentativo di stage": 0, "Nome fase": "raccoglie a /opt/work/V2ProcessRecords.py:215","Numero di attività": 72, "Informazioni RDD": [{"ID RDD": 6 , "Nome": "PythonRDD", "ID padre": [0], "Livello di archiviazione": {"Usa disco": falso, "Usa memoria": falso, "Usa archivioBlocco esterno": falso, "Deserializzato": falso , "Replica": 1}, "Numero di partizioni": 72, "Numero di partizioni memorizzate nella cache": 0, "Dimensione della memoria": 0, "Dimensione del ExternalBlockStore": 0, "Dimensione del disco": 0}, {"RDD ID ": 0," Nome ":" gs: // uc1f-bioinfocloud-vamp-m/letteratura/xml/P */*.nxml "," Scope ":" {\ "id \": \ "0 \", \ "name \": \ "wholeTextFiles \"} "," Parent IDs ": []," Livello di archiviazione ": {" Use Disk ": false," Use Memory ": false," Use ExternalBlockStore ": false," Deserialized ": false," Replication ": 1}," Number of Partitions ": 72," Numero di partizioni memorizzate nella cache ": 0, "Dimensione memoria": 0, "Dimensione ExternalBlockStore": 0, "Dimensione disco": 0}], "ID genitori": [], "Dettagli": "", "Tempo di invio": 1437616365566, "Tempo di completamento": 1437616397753, "Motivo errore": "Lavoro interrotto a causa di un errore di fase: il task 6 nella fase 2.0 non è riuscito 4 volte, errore più recente: attività persa 6.3 nella fase 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c. quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 perso) \ nDistretto stacktrace: "," Accumulables ": []}} {" Event ":" SparkListenerJobEnd "," Job ID ": 2," Tempo di completamento ": 1437616397755," Risultato lavoro ": {" Risultato ":" JobFailed "," Eccezione ": {" Messaggio ":" Lavoro interrotto a causa di insufficienza di fase: Attività 6 nella fase 2.0 non riuscita 4 volte, errore più recente: attività persa 6.3 i n stage 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 perso) \ nDistretto stacktrace: ", "Stack Trace": [{"Dichiarazione di Classe": "org.apache.spark.scheduler.DAGScheduler", "Nome del metodo": "org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages", "Nome file": " DAGScheduler.scala "," Numero riga ": 1266}, {" Classe di dichiarazione ":" org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1 "," Nome metodo ":" applica "," Nome file " : "DAGScheduler.scala", "Numero riga": 1257}, {"Classe dichiarare": "org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1", "Nome metodo": "applica", "File Nome ":" DAGScheduler.scala "," Numero riga ": 1256}, {" Classe di dichiarazione ":" scala.collection.mutable.ResizableArray $ class "," Nome metodo ":" foreach "," Nome file ":" ResizableArray.scala "," Numero riga ": 59}, {" Classe di dichiarazione ":" scala.collection.mutable.ArrayBuffer "," Nome metodo ":" foreach "," Nome file ":" ArrayBuffer.scala "," Numero riga ": 47}, {" Classe di dichiarazione ":" org.apache.spark.scheduler.DAGScheduler "," Nome metodo ":" abortStage "," Nome file ":" DAGScheduler.scala "," Numero riga ": 1256}, {" Classe di dichiarazione ":" org .apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1 "," Nome metodo ":" applica "," Nome file ":" DAGScheduler.scala "," Numero riga ": 730}, {" Classe di dichiarazione ": "org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1", "Nome metodo": "applica", "Nome file": "DAGScheduler.scala", "Numero riga": 730}, {"Classe di dichiarazione ":" scala.Option "," Nome metodo ":" foreach "," Nome file ":" Option.scala "," Numero riga ": 236}, {" Classe di dichiarazione ":" org.apache.spark.scheduler .DAGScheduler "," Nome metodo ":" handleTaskSetFailed "," Nome file ":" DAGScheduler.scala "," Numero riga ": 730}, {" Classe di dichiarazione ":" org.apache.spark.scheduler.DAGSchedulerEventProcessLoop ", "Nome metodo": "onReceive", "Nome file": "DAGScheduler.scala", "Numero riga": 1450}, {"Classe di dichiarazione": "org.apache.spark.scheduler.DAGSchedulerEventProcessLoop", "Nome metodo" : "onReceive", "File Name": "DAGScheduler.scala" , "Numero riga": 1411}, {"Classe di dichiarazione": "org.apache.spark.util.EventLoop $$ anon $ 1", "Nome metodo": "esegui", "Nome file": "EventLoop.scala" , "Numero linea": 48}]}}}

+0

Spark versione 1.4.1 – User17

risposta

2

È difficile dire qual è il problema senza il registro dell'esecutore non riuscito e non del driver, ma molto probabilmente è un problema di memoria. Prova ad aumentare significativamente il numero della partizione (se la tua corrente è 32 prova 200)

+0

L'ho provato con 200 partizioni ma fallisce anche in quel momento. Anche con 800 partizioni con alcune altre impostazioni di configurazione. Ma – User17

+0

Ma sto ottenendo lo stesso problema Attività persa 4 volte rispetto a ExecutorLostFailure. A volte ottengo il tempo di connessione. Inoltre da quando sono su Google Cloud Mesos Cluster ho provato e cercato i log come suggerito e ho esaminato var/log/mesos (i log master e slave sono entrambi in/var/log/mesos per impostazione predefinita come suggerito nella documentazione di spark mesos) ma Non ho trovato nessuna buona informazione. Ci sono altri log che posso guardare o postare qui ?? Con i registri dell'esecutore intendevi la stessa cosa? – User17

3

Questo errore si verifica perché un'attività non è riuscita più di quattro volte. Prova ad aumentare il parallelismo nel tuo cluster usando il seguente parametro.

--conf "spark.default.parallelism=100" 

Impostare il valore di parallelismo su 2 o 3 volte il numero di core disponibili sul cluster. Se questo non funziona. prova ad aumentare il parallelismo in modo esponenziale. se il tuo attuale parallelismo non funziona, moltiplicalo per due e così via. Inoltre ho osservato che aiuta se il tuo livello di parallelismo è un numero primo soprattutto se stai usando groupByKkey.

+0

L'aumento del numero di partizioni non aiuta il mio caso. Ma l'impostazione del numero di parallelismo aiuta. – kennyut

2

Avevo questo problema e il problema per me era l'incidenza molto elevata di una chiave in un'attività reduceByKey. Questo è stato (penso) causando una lista enorme da raccogliere su uno degli esecutori, che avrebbe poi generato errori OOM.

La soluzione per me era quella di filtrare le chiavi con alta popolazione prima di fare il reduceByKey, ma apprezzo che questo potrebbe essere possibile o meno a seconda dell'applicazione. Non avevo comunque bisogno di tutti i miei dati.

1

La causa più comune di ExecutorLostFailure come da mia comprensione è OOM in executor.

Per risolvere il problema di OOM, è necessario capire che cosa lo causa esattamente. Aumentare semplicemente il parallelismo predefinito o aumentare la memoria dell'esecutore non è una soluzione strategica.

Se si guarda al parallelismo crescente, si cerca di creare più esecutori in modo che ogni esecutore possa lavorare su un numero sempre minore di dati. Ma se i tuoi dati sono distorti in modo tale che la chiave su cui avviene il partizionamento dei dati (per il parallelismo) abbia più dati, semplicemente aumentare il parallelismo non avrà alcun effetto.

Allo stesso modo aumentando la memoria di Executor sarà un modo molto inefficiente di consegnare uno scenario come se un solo esecutore fallisse con ExecutorLostFailure, richiedere una maggiore memoria per tutti gli esecutori farà sì che l'applicazione richieda molta più memoria di quella realmente prevista.

Problemi correlati