2015-09-28 17 views
8

In breve:Spark molteplici contesti

grappolo EC2: 1 master 3 schiavi versione

Spark: 1.3.1

Vorrei utilizzare l'opzione spark.driver.allowMultipleContexts, un contesto locale (solo master) e un cluster (master e slave).

ottengo questo errore stacktrace (linea 29 è dove io chiamo l'oggetto che inizializzare il secondo sparkcontext):

fr.entry.Main.main(Main.scala) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795) 
    at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847) 
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754) 
    at fr.entry.cluster$.<init>(Main.scala:79) 
    at fr.entry.cluster$.<clinit>(Main.scala) 
    at fr.entry.Main$delayedInit$body.apply(Main.scala:29) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:71) 
    at scala.App$$anonfun$main$1.apply(App.scala:71) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) 
    at scala.App$class.main(App.scala:71) 
    at fr.entry.Main$.main(Main.scala:14) 
    at fr.entry.Main.main(Main.scala) 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING 
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29 
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false) 
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29) 
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List() 
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List() 
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents 
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879 
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB) 
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879 
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB) 
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB) 
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING 
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29) 
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 

Maggiori dettagli:

Vorrei eseguire un programma che fa due cose. Per prima cosa ho uno sparkContext local (solo sul master), faccio un RDD e faccio alcune operazioni. In secondo luogo ho un secondo sparkContext inizializzato con un master e 3 slave che fanno anche un RDD e fanno alcune operazioni. Quindi nel primo caso voglio usare i 16 core del master e il secondo caso voglio usare 8cores x 3 degli slave.

semplice esempio:

val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8)) 
println(local.sparkContext.makeRDD(arr).count()) 
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum) 

I miei due SparkContexts:

object local { 

    val project = "test" 
    val version = "1.0" 

    val sc = new SparkConf() 
    .setMaster("local[16]") 
    .setAppName("Local") 
    .set("spark.local.dir", "/mnt") 
    .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar")) 
    .setSparkHome("/root/spark") 
    .set("spark.driver.allowMultipleContexts", "true") 
    .set("spark.executor.memory", "45g") 

    val sparkContext = new SparkContext(sc) 
} 

object cluster { 

    val project = "test" 
    val version = "1.0" 

    val sc = new SparkConf() 
    .setMaster(masterURL) // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com 
    .setAppName("Cluster") 
    .set("spark.local.dir", "/mnt") 
    .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars) 
    .setSparkHome("/root/spark") 
    .set("spark.driver.allowMultipleContexts", "true") 
    .set("spark.executor.memory", "35g") 

    val sparkContext = new SparkContext(sc) 
} 

Come posso risolvere questo problema?

+1

È possibile includere il motivo per cui si desidera utilizzare due contesti? Più spesso questo non è richiesto – Gillespie

+0

@Gillespie, diciamo che ho 3 programmi: prog1 e prog3 possono essere eseguiti in parallelo e prog2 devono essere sequenziali. L'uscita di Prog1 è di 15 RDD (set di dati). Prog2 è un algoritmo di apprendimento automatico che ho bisogno di eseguire 15 volte. Poiché prog2 deve essere eseguito su 1 core localmente. Ho fatto un piccolo hack che è quello di fare un RDD che contiene i 15 set di dati raccolti. Mappa su questo RDD ed eseguo prog2 su ogni record. Prog3 prende i 15 risultati di prog2 e fa alcune operazioni in parallelo. Spero che questo sia chiaro? Penso che nel mio caso sia necessario, ma se non lo è, sono anche interessato a conoscere la risposta del mio piccolo esempio. – GermainGum

risposta

10

Sebbene l'opzione di configurazione spark.driver.allowMultipleContexts esista, è fuorviante poiché l'utilizzo di più contesti Spark è sconsigliato. Questa opzione è utilizzata solo per i test interni di Spark e non dovrebbe essere utilizzata nei programmi utente. È possibile ottenere risultati imprevisti durante l'esecuzione di più di un contesto Spark in una singola JVM.

+0

questo scoraggiamento è documentato ovunque? Mi piacerebbe che fosse vero che 2 è scoraggiato, ma mi piacerebbe vederlo da qualche parte, se possibile, ufficiale – Kristian

+0

Questa limitazione è stata presumibilmente risolta nella scintilla 2.0. Ci sto lavorando. – javadba

1

Se è necessario un coordinamento tra 2 programmi, sarebbe meglio farne parte di una singola applicazione Spark per sfruttare le ottimizzazioni interne di Sparks e per evitare l'I/O non necessario.

In secondo luogo, se 2 applicazioni non devono coordinarsi in alcun modo, è possibile avviare 2 applicazioni separate. Poiché si utilizza Amazon EC2/EMR, è possibile utilizzare YARN come gestore risorse senza investimenti significativi in ​​termini di tempo, come descritto in here.

1

Se si ha la necessità di lavorare con molti contesti Spark, è possibile attivare l'opzione speciale [MultipleContexts] (1), ma è utilizzato solo per i test interni di Spark e non dovrebbe essere usato nei programmi utente. Si otterrà un comportamento imprevisto durante l'esecuzione di più di un contesto Spark in una singola JVM [SPARK-2243] (2). Tuttavia, è possibile creare diversi contesti in JVM separate e gestire contesti a livello di SparkConf, che si adattano in modo ottimale ai lavori eseguibili.

Ecco come si presenta: Mist creates every new Sparkcontext in its own JVM.

C'è un middleware in cima alla Spark - [Mist]. Gestisce i contesti Spark e le JVM multiple, quindi è possibile avere diversi lavori come la pipeline ETL, un lavoro di previsione veloce, una query Hive ad-hoc e un'applicazione di streaming Spark in esecuzione in parallelo sullo stesso cluster.

1> github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/SparkContextSuite.scala#L67

2> issues.apache.org/jira/browse/SPARK-2243

0

Java:

.set("spark.driver.allowMultipleContexts", "true") 

+

sparkContext.cancelAllJobs(); 
sparkContext.stop(); 

funziona per me.