Mi sono anche imbattuto in questo problema, ed ecco la soluzione che ho trovato per ora. Non è perfetto, ma spero che sarà utile. Per riferimento, sto usando Java 1.7 e AWS Java SDK versione 1.9.13.
Si noti che questo codice si presuppone che si sta aspettando la grappolo di interrompere, non il passi in senso stretto; se il tuo cluster termina quando tutti i tuoi passi sono finiti, va bene, ma se stai usando cluster che rimangono in vita dopo il completamento del passo questo non ti aiuterà troppo.
Inoltre, questo codice controlla e registra le modifiche allo stato del cluster e inoltre diagnostica se il cluster ha terminato con errori e genera un'eccezione in caso contrario.
private void yourMainMethod() {
RunJobFlowRequest request = ...;
try {
RunJobFlowResult submission = emr.runJobFlow(request);
String jobFlowId = submission.getJobFlowId();
log.info("Submitted EMR job as job flow id {}", jobFlowId);
DescribeClusterResult result =
waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS);
diagnoseClusterResult(result, jobFlowId);
} finally {
emr.shutdown();
}
}
private DescribeClusterResult waitForCompletion(
AmazonElasticMapReduceClient emr, String jobFlowId,
long sleepTime, TimeUnit timeUnit)
throws InterruptedException {
String state = "STARTING";
while (true) {
DescribeClusterResult result = emr.describeCluster(
new DescribeClusterRequest().withClusterId(jobFlowId)
);
ClusterStatus status = result.getCluster().getStatus();
String newState = status.getState();
if (!state.equals(newState)) {
log.info("Cluster id {} switched from {} to {}. Reason: {}.",
jobFlowId, state, newState, status.getStateChangeReason());
state = newState;
}
switch (state) {
case "TERMINATED":
case "TERMINATED_WITH_ERRORS":
case "WAITING":
return result;
}
timeUnit.sleep(sleepTime);
}
}
private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) {
ClusterStatus status = result.getCluster().getStatus();
ClusterStateChangeReason reason = status.getStateChangeReason();
ClusterStateChangeReasonCode code =
ClusterStateChangeReasonCode.fromValue(reason.getCode());
switch (code) {
case ALL_STEPS_COMPLETED:
log.info("Completed EMR job {}", jobFlowId);
break;
default:
failEMR(jobFlowId, status);
}
}
private static void failEMR(String jobFlowId, ClusterStatus status) {
String msg = "EMR cluster run %s terminated with errors. ClusterStatus = %s";
throw new RuntimeException(String.format(msg, jobFlowId, status));
}
fonte
2014-12-20 00:46:40
Invio la propria soluzione al vostro problema immediatamente è abbastanza benvenuto qui, tuttavia, l'approccio desiderato è quello di dividere questo in una domanda e una risposta ancora, vedere [E 'giusto chiedere e rispondere alle vostre proprie domande] (http : //blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) - questo aiuta a ordinare/categorizzare le cose in modo appropriato, cioè a fare spazio a domande davvero senza risposta dove applicabile, grazie! –
Grazie, lo annoterò come riferimento futuro. – siditom
Dovresti includere anche gli altri stati completati. Alcune persone che leggono questo potrebbero loop per sempre se inizializzano 'jobAttributes' come dato. 'DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest(). WithJobFlowStates (" COMPLETED "," TERMINATED "," FAILED ");' –