Non c'è modo per la scintilla di uccidere i suoi compiti se è troppo lungo.
Ma ho trovato un modo per gestire questo utilizzando speculation,
Ciò significa che se una o più attività sono in esecuzione lentamente in una fase, che sarà rilanciato.
spark.speculation true
spark.speculation.multiplier 2
spark.speculation.quantile 0
Nota: spark.speculation.quantile
significa che la "speculazione" entreranno in gioco dal vostro primo compito. Quindi usalo con cautela. Lo sto utilizzando perché alcuni lavori vengono rallentati a causa di GC nel tempo. Quindi penso che dovresti sapere quando usarlo - non è un proiettile d'argento.
Alcuni link rilevanti: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-always-wait-for-stragglers-to-finish-running-td14298.html e http://mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%[email protected].com%3E
Aggiornamento
ho trovato una soluzione per il mio problema (potrebbe non funzionare per tutti). Ho avuto un sacco di simulazioni in esecuzione per attività, quindi ho aggiunto il timeout in giro. Se una simulazione richiede più tempo (a causa di una inclinazione dei dati per quella specifica esecuzione), verrà sospesa.
ExecutorService executor = Executors.newCachedThreadPool();
Callable<SimResult> task =() -> simulator.run();
Future<SimResult> future = executor.submit(task);
try {
result = future.get(1, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
future.cancel(true);
SPARKLOG.info("Task timed out");
}
Assicurarsi di gestire un interrupt interno ciclo principale 's il simulator
come:
if(Thread.currentThread().isInterrupted()){
throw new InterruptedException();
}
Forse è bello accelerare un po 'di tempo a capire perché questo sta accadendo e vedere se è possibile evitarlo. Il più delle volte, questo accade perché il partizionamento non è bilanciato uniformemente tra i tasti, risultando in alcune chiavi con pochi valori e altre con una quantità enorme di valori. – hveiga