2016-03-28 7 views
5

Sto cercando di evitare la soluzione "while (true)" quando aspetto che il lavoro di apache spark sia terminato, ma senza successo.Come attendere correttamente il lavoro di apache spark launcher durante l'avvio da un'altra applicazione?

Ho un'applicazione spark che suppone di elaborare alcuni dati e di mettere un risultato nel database, io lo chiamo dal mio servizio di primavera e vorrei aspettare fino a quando il lavoro è finito.

Esempio:

Launcher con metodo:

@Override 
public void run(UUID docId, String query) throws Exception { 
    launcher.addAppArgs(docId.toString(), query); 

    SparkAppHandle sparkAppHandle = launcher.startApplication(); 

    sparkAppHandle.addListener(new SparkAppHandle.Listener() { 
     @Override 
     public void stateChanged(SparkAppHandle handle) { 
      System.out.println(handle.getState() + " new state"); 
     } 

     @Override 
     public void infoChanged(SparkAppHandle handle) { 
      System.out.println(handle.getState() + " new state"); 
     } 
    }); 

    System.out.println(sparkAppHandle.getState().toString()); 
} 

Come attendere correttamente fino a quando lo stato di gestore è "finito".

+0

Sei riuscito a risolvere questo? – gaurav5430

risposta

2

Sto anche utilizzando SparkLauncher da un'applicazione Spring. Ecco un riepilogo dell'approccio che ho seguito (seguendo gli esempi nel JavaDoc).

Il @Service utilizzato per avviare il processo implementa anche SparkHandle.Listener e passa un riferimento a se stesso tramite .startApplication, ad es.

... 
... 
@Service 
public class JobLauncher implements SparkAppHandle.Listener { 
... 
... 
... 
private SparkAppHandle launchJob(String mainClass, String[] args) throws Exception { 

    String appResource = getAppResourceName(); 

    SparkAppHandle handle = new SparkLauncher() 
     .setAppResource(appResource).addAppArgs(args) 
     .setMainClass(mainClass) 
     .setMaster(sparkMaster) 
     .setDeployMode(sparkDeployMode) 
     .setSparkHome(sparkHome) 
     .setConf(SparkLauncher.DRIVER_MEMORY, "2g") 
     .startApplication(this); 

    LOG.info("Launched [" + mainClass + "] from [" + appResource + "] State [" + handle.getState() + "]"); 

    return handle; 
} 

/** 
* Callback method for changes to the Spark Job 
*/ 
@Override 
public void infoChanged(SparkAppHandle handle) { 

    LOG.info("Spark App Id [" + handle.getAppId() + "] Info Changed. State [" + handle.getState() + "]"); 

} 

/** 
* Callback method for changes to the Spark Job's state 
*/ 
@Override 
public void stateChanged(SparkAppHandle handle) { 

    LOG.info("Spark App Id [" + handle.getAppId() + "] State Changed. State [" + handle.getState() + "]"); 

} 

Usando questo approccio, si può agire quando lo stato cambia a "Failed", "finito" o "ucciso".

Spero che questa informazione ti sia utile.

+0

Anche io sto affrontando lo stesso problema. Ho provato con il modo in cui OP utilizza (creando un nuovo oggetto listener anonimo) e il modo in cui descrivi. In entrambi i casi, i metodi dei listener non venivano richiamati. – Reddy

+0

@Reddy: sei riuscito a farlo funzionare? Anch'io sto affrontando lo stesso problema. L'applicazione si chiude immediatamente con l'ottenimento dell'appID e lo stato. Funziona se chiamo Thread.sleep() esplicitamente. Grazie! – Tariq

1

Ho implementato utilizzando CountDownLatch e funziona come previsto.

... 
final CountDownLatch countDownLatch = new CountDownLatch(1); 
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch); 
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener); 
Thread sparkAppListenerThread = new Thread(sparkAppListener); 
sparkAppListenerThread.start(); 
long timeout = 120; 
countDownLatch.await(timeout, TimeUnit.SECONDS);  
    ... 

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable { 
    private static final Log log = LogFactory.getLog(SparkAppListener.class); 
    private final CountDownLatch countDownLatch; 
    public SparkAppListener(CountDownLatch countDownLatch) { 
     this.countDownLatch = countDownLatch; 
    } 
    @Override 
    public void stateChanged(SparkAppHandle handle) { 
     String sparkAppId = handle.getAppId(); 
     State appState = handle.getState(); 
     if (sparkAppId != null) { 
      log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - " 
        + SPARK_STATE_MSG.get(appState)); 
     } else { 
      log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); 
     } 
     if (appState != null && appState.isFinal()) { 
      countDownLatch.countDown(); 
     } 
    } 
    @Override 
    public void infoChanged(SparkAppHandle handle) {} 
    @Override 
    public void run() {} 
} 
Problemi correlati