2015-07-31 10 views
12

Sto tentando di inviare un JAR con il lavoro Spark nel cluster YARN dal codice Java. Sto usando SparkLauncher a presentare esempio SparkPi:Spark Launcher in attesa di completamento del lavoro infinitamente

Process spark = new SparkLauncher() 
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar") 
    .setMainClass("org.apache.spark.examples.SparkPi") 
    .setMaster("yarn-cluster") 
    .launch(); 
System.out.println("Waiting for finish..."); 
int exitCode = spark.waitFor(); 
System.out.println("Finished! Exit code:" + exitCode); 

ci sono due problemi:

  1. Pur presentando in modalità "filo-cluster", l'applicazione è con successo presentata al filato ed esegue con successo (è visibile nell'interfaccia utente YARN, riportato come SUCCESS e pi viene stampato nell'output). Tuttavia, l'applicazione di invio non viene mai notificata che l'elaborazione è terminata - si blocca all'infinito dopo la stampa "In attesa di finire ..." È possibile trovare il log del contenitore here
  2. Mentre si invia in modalità "filato-client", l'applicazione non appare nell'interfaccia utente di YARN e l'applicazione di invio si blocca su "In attesa di finire ..." Quando il codice sospeso viene ucciso, l'applicazione viene visualizzata nell'interfaccia utente YARN e viene segnalata come SUCCESS, ma l'output è vuoto (non viene stampato pi su). Il registro del contenitore può essere trovato here

ho cercato di eseguire l'applicazione A entrambi con Oracle Java 7 e 8.

risposta

14

ho ricevuto aiuto nel Spark mailing list. La chiave è leggere/cancellare getInputStream e getErrorStream() sul processo. Il processo figlio potrebbe riempire il buffer e causare un deadlock: vedere Oracle docs regarding Process. I flussi devono essere letti in thread separati:

Process spark = new SparkLauncher() 
    .setSparkHome("C:\\spark-1.4.1-bin-hadoop2.6") 
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar") 
    .setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch(); 

InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input"); 
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); 
inputThread.start(); 

InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error"); 
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); 
errorThread.start(); 

System.out.println("Waiting for finish..."); 
int exitCode = spark.waitFor(); 
System.out.println("Finished! Exit code:" + exitCode); 

dove la classe InputStreamReaderRunnable è:

public class InputStreamReaderRunnable implements Runnable { 

    private BufferedReader reader; 

    private String name; 

    public InputStreamReaderRunnable(InputStream is, String name) { 
     this.reader = new BufferedReader(new InputStreamReader(is)); 
     this.name = name; 
    } 

    public void run() { 
     System.out.println("InputStream " + name + ":"); 
     try { 
      String line = reader.readLine(); 
      while (line != null) { 
       System.out.println(line); 
       line = reader.readLine(); 
      } 
      reader.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
+0

Nel mio caso, ho avuto un problema di classpath, quindi la scintilla è terminata immediatamente. Quindi, se sembra a qualcun altro che semplicemente non sta chiamando la tua app scintilla, anche questa risposta funziona. – jmmut

7

Poiché si tratta di un vecchio post, vorrei aggiungere un aggiornamento che potrebbe aiutare chi mai letto questo post dopo. Nella scintilla 1.6.0 ci sono alcune funzioni aggiunte nella classe SparkLauncher. Quale è:

def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle 

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher

è possibile eseguire l'applicazione con la necessità per i thread aggiuntivi per l'stdout e stderr movimentazione peluche c'è un bel report di stato di l'applicazione in esecuzione. Utilizzare questo codice:

val env = Map(
     "HADOOP_CONF_DIR" -> hadoopConfDir, 
     "YARN_CONF_DIR" -> yarnConfDir 
    ) 
    val handler = new SparkLauncher(env.asJava) 
     .setSparkHome(sparkHome) 
     .setAppResource("Jar/location/.jar") 
     .setMainClass("path.to.the.main.class") 
     .setMaster("yarn-client") 
     .setConf("spark.app.id", "AppID if you have one") 
     .setConf("spark.driver.memory", "8g") 
     .setConf("spark.akka.frameSize", "200") 
     .setConf("spark.executor.memory", "2g") 
     .setConf("spark.executor.instances", "32") 
     .setConf("spark.executor.cores", "32") 
     .setConf("spark.default.parallelism", "100") 
     .setConf("spark.driver.allowMultipleContexts","true") 
     .setVerbose(true) 
     .startApplication() 
println(handle.getAppId) 
println(handle.getState) 

È possibile mantenere lo stato enquering se l'applicazione scintilla fino a dare il successo. Per informazioni su come il server Spark Launcher funziona in 1.6.0. vedi questo link: https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java

+3

Vorrei sottolineare che funziona solo in modalità client. – msemelman

+0

@msemelman Grazie molte per questo chiarimento, mi sono bloccato su questo. Come hai imparato questo fatto? –

+0

Funziona anche in modalità cluster. Sto usando Spark-1.6.1 – Tariq

3

Ho implementato utilizzando CountDownLatch e funziona come previsto. Questo è per SparkLauncher versione 2.0.1 e funziona anche in modalità Yarn-cluster.

... 
final CountDownLatch countDownLatch = new CountDownLatch(1); 
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch); 
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener); 
Thread sparkAppListenerThread = new Thread(sparkAppListener); 
sparkAppListenerThread.start(); 
long timeout = 120; 
countDownLatch.await(timeout, TimeUnit.SECONDS);  
    ... 

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable { 
    private static final Log log = LogFactory.getLog(SparkAppListener.class); 
    private final CountDownLatch countDownLatch; 
    public SparkAppListener(CountDownLatch countDownLatch) { 
     this.countDownLatch = countDownLatch; 
    } 
    @Override 
    public void stateChanged(SparkAppHandle handle) { 
     String sparkAppId = handle.getAppId(); 
     State appState = handle.getState(); 
     if (sparkAppId != null) { 
      log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - " 
        + SPARK_STATE_MSG.get(appState)); 
     } else { 
      log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); 
     } 
     if (appState != null && appState.isFinal()) { 
      countDownLatch.countDown(); 
     } 
    } 
    @Override 
    public void infoChanged(SparkAppHandle handle) {} 
    @Override 
    public void run() {} 
} 
+0

Questo è davvero un commento, non una risposta. Una volta raggiunto 50 [reputazione] (// stackoverflow.com/help/whats-reputation), sarai in grado di [commentare tutti i post] (// stackoverflow.com/privileges/comment). – dorukayhan

Problemi correlati