2015-02-25 8 views
13


Si dice in Apache Spark documentazione "all'interno di ogni applicazione Spark, più‘lavori’(Spark azioni) può essere in esecuzione contemporaneamente, se sono state presentate da diversi fili ". Qualcuno può spiegare come ottenere questa concorrenza per il seguente codice di esempio?
Come eseguire processi simultanei (azioni) in Apache Spark usando contesto sola scintilla

SparkConf conf = new SparkConf().setAppName("Simple_App"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1"); 
    JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2"); 

    System.out.println(file1.count()); 
    System.out.println(file2.count()); 

Questi due lavori sono indipendenti e devono essere eseguiti contemporaneamente.
Grazie.

+0

Avresti bisogno di iniziare un nuovo thread, o due (potete trovare le istruzioni per quella on-line, ne sono certo), e poi utilizzare lo stesso SparkContext da entrambi i thread . – pzecevic

+0

@pzecevic Grazie per la risposta. Ho scritto un codice di esempio e sono in grado di eseguire il threading in modalità locale. Tuttavia, durante l'esecuzione nel cluster YARN, il contesto spark termina con lo stato SUCCEEDED prima che l'esecuzione del thread sia completa e quindi non ottengo alcun output. Puoi suggerire qualcosa al riguardo? Posso condividere il codice ma c'è un limite nella sezione commenti. – Sporty

+0

@Sporty Non ne sono sicuro, ma non configuro questo conf? 'spark.streaming.concurrentJobs' – void

risposta

16

provare qualcosa di simile:

final JavaSparkContext sc = new JavaSparkContext("local[2]","Simple_App"); 
    ExecutorService executorService = Executors.newFixedThreadPool(2); 
    // Start thread 1 
    Future<Long> future1 = executorService.submit(new Callable<Long>() { 
     @Override 
     public Long call() throws Exception { 
      JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1"); 
      return file1.count(); 
     } 
    }); 
    // Start thread 2 
    Future<Long> future2 = executorService.submit(new Callable<Long>() { 
     @Override 
     public Long call() throws Exception { 
      JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2"); 
      return file2.count(); 
     } 
    }); 
    // Wait thread 1 
    System.out.println("File1:"+future1.get()); 
    // Wait thread 2 
    System.out.println("File2:"+future2.get()); 
+2

Non possiamo semplicemente usare il comando 'spark.streaming.concurrentJobs' per impostare il livello di concorrenza? – void

Problemi correlati