2012-05-25 11 views
11

Recentemente ho lavorato con Amazon Web Services (AWS) e ho notato che non c'è molta documentazione sull'argomento, quindi ho aggiunto la mia soluzione.Come posso attendere il completamento di un flusso di lavoro Elastic MapReduce in un'applicazione Java?

Stavo scrivendo un'applicazione utilizzando Amazon Elastic MapReduce (Amazon EMR). Dopo che i calcoli sono terminati, avevo bisogno di eseguire alcuni lavori sui file creati da loro, quindi avevo bisogno di sapere quando il flusso di lavoro ha completato il suo lavoro.

Questo è come si può verificare se il vostro flusso di lavoro completato:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials); 

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest() 
    .withJobFlowStates("COMPLETED"); 

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows(); 
JobFlowDetail detail = jobs.get(0); 

detail.getJobFlowId(); //the id of one of the completed jobs 

È anche possibile cercare per uno specifico ID lavoro in DescribeJobFlowsRequest e poi per verificare se quel lavoro ha finito di riuscita.

Spero che possa aiutare gli altri.

+5

Invio la propria soluzione al vostro problema immediatamente è abbastanza benvenuto qui, tuttavia, l'approccio desiderato è quello di dividere questo in una domanda e una risposta ancora, vedere [E 'giusto chiedere e rispondere alle vostre proprie domande] (http : //blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) - questo aiuta a ordinare/categorizzare le cose in modo appropriato, cioè a fare spazio a domande davvero senza risposta dove applicabile, grazie! –

+0

Grazie, lo annoterò come riferimento futuro. – siditom

+0

Dovresti includere anche gli altri stati completati. Alcune persone che leggono questo potrebbero loop per sempre se inizializzano 'jobAttributes' come dato. 'DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest(). WithJobFlowStates (" COMPLETED "," TERMINATED "," FAILED ");' –

risposta

1

Una volta completato il flusso di lavoro, il cluster si arresta e la partizione HDFS viene persa. al fine di prevenire la perdita di dati, configurare l'ultimo passaggio del flusso di lavoro per archiviare i risultati in Amazon S3.

Se il JobFlowInstancesDetail: il parametro KeepJobFlowAliveWhenNoSteps è impostato su TRUE, il flusso di lavoro sarà transizione allo stato di attesa piuttosto che spegnere una volta che i passi sono completati.

È consentito un massimo di 256 passaggi in ciascun flusso di lavoro.

Se il lavoro richiede molto tempo, si consiglia di archiviare i risultati periodicamente.

Per farla breve: non c'è modo di sapere quando è fatto. Invece è necessario salvare i dati come parte del lavoro.

1

Utilizzare l'opzione --wait-for-steps durante la creazione del flusso di lavoro.

./elastic-mapreduce --create \ 
... 
--wait-for-steps \ 
... 
3

Mi sono anche imbattuto in questo problema, ed ecco la soluzione che ho trovato per ora. Non è perfetto, ma spero che sarà utile. Per riferimento, sto usando Java 1.7 e AWS Java SDK versione 1.9.13.

Si noti che questo codice si presuppone che si sta aspettando la grappolo di interrompere, non il passi in senso stretto; se il tuo cluster termina quando tutti i tuoi passi sono finiti, va bene, ma se stai usando cluster che rimangono in vita dopo il completamento del passo questo non ti aiuterà troppo.

Inoltre, questo codice controlla e registra le modifiche allo stato del cluster e inoltre diagnostica se il cluster ha terminato con errori e genera un'eccezione in caso contrario.

private void yourMainMethod() { 
    RunJobFlowRequest request = ...; 

    try { 
     RunJobFlowResult submission = emr.runJobFlow(request); 
     String jobFlowId = submission.getJobFlowId(); 
     log.info("Submitted EMR job as job flow id {}", jobFlowId); 

     DescribeClusterResult result = 
      waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS); 
     diagnoseClusterResult(result, jobFlowId); 
    } finally { 
     emr.shutdown(); 
    } 
} 

private DescribeClusterResult waitForCompletion(
      AmazonElasticMapReduceClient emr, String jobFlowId, 
      long sleepTime, TimeUnit timeUnit) 
     throws InterruptedException { 
    String state = "STARTING"; 
    while (true) { 
     DescribeClusterResult result = emr.describeCluster(
       new DescribeClusterRequest().withClusterId(jobFlowId) 
     ); 
     ClusterStatus status = result.getCluster().getStatus(); 
     String newState = status.getState(); 
     if (!state.equals(newState)) { 
      log.info("Cluster id {} switched from {} to {}. Reason: {}.", 
        jobFlowId, state, newState, status.getStateChangeReason()); 
      state = newState; 
     } 

     switch (state) { 
      case "TERMINATED": 
      case "TERMINATED_WITH_ERRORS": 
      case "WAITING": 
       return result; 
     } 

     timeUnit.sleep(sleepTime); 
    } 
} 

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) { 
    ClusterStatus status = result.getCluster().getStatus(); 
    ClusterStateChangeReason reason = status.getStateChangeReason(); 
    ClusterStateChangeReasonCode code = 
     ClusterStateChangeReasonCode.fromValue(reason.getCode()); 
    switch (code) { 
    case ALL_STEPS_COMPLETED: 
     log.info("Completed EMR job {}", jobFlowId); 
     break; 
    default: 
     failEMR(jobFlowId, status); 
    } 
} 

private static void failEMR(String jobFlowId, ClusterStatus status) { 
    String msg = "EMR cluster run %s terminated with errors. ClusterStatus = %s"; 
    throw new RuntimeException(String.format(msg, jobFlowId, status)); 
} 
Problemi correlati