2010-05-03 14 views
73

Sto cercando un'implementazione ExecutorService che può essere fornita con un timeout. Le attività inviate a ExecutorService vengono interrotte se impiegano più tempo del timeout per l'esecuzione. Implementare una tale bestia non è un compito così difficile, ma mi chiedo se qualcuno sa di un'implementazione esistente.ExecutorService che interrompe le attività dopo un timeout

Ecco cosa mi è venuto in mente in base ad alcune delle discussioni seguenti. Qualche commento?

+0

È che 'ora di inizio' del timeout momento della presentazione? O il tempo in cui l'attività inizia ad essere eseguita? –

+0

Buona domanda. Quando inizia l'esecuzione. Presumibilmente usando l'hook 'protected void beforeExecute (Thread t, Runnable r)'. –

+0

@ scompt.com stai ancora utilizzando questa soluzione o è stata sostituita –

risposta

69

è possibile utilizzare un ScheduledExecutorService per questo. Per prima cosa lo invii solo una volta per iniziare immediatamente e conservare il futuro che viene creato. Successivamente è possibile inviare una nuova attività che annullerebbe il futuro conservato dopo un certo periodo di tempo.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
final Future handler = executor.submit(new Callable(){ ... }); 
executor.schedule(new Runnable(){ 
    public void run(){ 
     handler.cancel(); 
    }  
}, 10000, TimeUnit.MILLISECONDS); 

Questo eseguirà il gestore (funzionalità principale per essere interrotto) per 10 secondi, quindi si annulla (cioè interrupt) che compito specifico.

+9

Idea interessante, ma cosa succede se l'attività termina prima del timeout (che normalmente avverrà)? Preferirei non avere tonnellate di attività di pulizia in attesa di essere eseguito solo per scoprire che l'attività assegnata è già stata completata. Dovrebbe esserci un altro thread per monitorare i Futures mentre finiscono di rimuovere i loro compiti di pulizia. –

+2

L'esecutore pianificherà questo annullamento solo una volta. Se l'attività è completata, l'annullamento è un no op e il lavoro continua invariato. Ci deve essere solo uno schema aggiuntivo per cancellare le attività e un thread per eseguirle. Potresti avere due esecutori, uno per inviare i tuoi compiti principali e uno per cancellarli. –

+2

È vero, ma cosa succede se il timeout è di 5 ore e in quel momento vengono eseguite attività 10k. Mi piacerebbe evitare che tutti quei no-op in giro occupino memoria e causino interruttori di contesto. –

3

Completare l'attività in FutureTask e specificare il timeout per FutureTask. Guardare l'esempio nella mia risposta a questa domanda,

java native Process timeout

+0

Mi rendo conto che ci sono un paio di modi per farlo usando le classi 'java.util.concurrent', ma sto cercando un'implementazione' ExecutorService'. –

+0

Se si sta dicendo che si desidera che ExecutorService nasconda il fatto che i timeout vengono aggiunti dal codice client, è possibile implementare il proprio ExecutorService che racchiude ogni eseguibile consegnato ad esso con un FutureTask prima di eseguirli. – erikprice

5

Sfortunatamente la soluzione è difettosa. C'è una specie di bug con ScheduledThreadPoolExecutor, anche riportato in this question: l'annullamento di un'attività inoltrata non rilascia completamente le risorse di memoria associate all'attività; le risorse vengono rilasciate solo quando l'attività scade.

Se si crea quindi uno TimeoutThreadPoolExecutor con una scadenza piuttosto lunga (un utilizzo tipico) e si inoltrano le attività abbastanza velocemente, si finisce col riempire la memoria, anche se le attività sono state effettivamente completate correttamente.

si può vedere il problema con il seguente (molto grezzo) programma di test:

public static void main(String[] args) throws InterruptedException { 
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
      new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES); 
    //ExecutorService service = Executors.newFixedThreadPool(1); 
    try { 
     final AtomicInteger counter = new AtomicInteger(); 
     for (long i = 0; i < 10000000; i++) { 
      service.submit(new Runnable() { 
       @Override 
       public void run() { 
        counter.incrementAndGet(); 
       } 
      }); 
      if (i % 10000 == 0) { 
       System.out.println(i + "/" + counter.get()); 
       while (i > counter.get()) { 
        Thread.sleep(10); 
       } 
      } 
     } 
    } finally { 
     service.shutdown(); 
    } 
} 

Il programma esaurisce la memoria disponibile, anche se attende il deposto le uova Runnable s per completare.

Ho pensato a questo per un po ', ma sfortunatamente non sono riuscito a trovare una buona soluzione.

MODIFICA: Ho scoperto che questo problema è stato segnalato come JDK bug 6602600 e sembra che sia stato risolto molto recentemente.

1

Sembra che il problema non sia nell'errore 6602600 JDK (è stato risolto nel 2010-05-22), ma nel chiamata errata di sonno (10) in cerchio. Nota aggiuntiva: la Discussione principale deve dare direttamente CHANCE ad altri thread per realizzare le loro attività invocando SLEEP (0) in OGNI ramo del cerchio esterno. È preferibile, credo, utilizzare Thread.yield() anziché Thread.sonno (0)

Il risultato corretto parte del precedente codice problema è tale simili:

....................... 
........................ 
Thread.yield();   

if (i % 1000== 0) { 
System.out.println(i + "/" + counter.get()+ "/"+service.toString()); 
} 

//     
//    while (i > counter.get()) { 
//     Thread.sleep(10); 
//    } 

Esso funziona correttamente con quantità di contatore esterno fino a 150 000 000 cerchi testati.

0

Che dire di questa idea alternativa:

  • due hanno due esecutori:
    • uno per:
      • presentando il compito, senza preoccuparsi il timeout del compito
      • aggiungendo il risultato futuro e il momento in cui dovrebbe terminare con una struttura interna
    • uno per l'esecuzione di un lavoro interno che sta controllando la struttura interna se alcune attività sono scadute e se devono essere annullate.

piccolo campione è qui:

public class AlternativeExecutorService 
{ 

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue  = new CopyOnWriteArrayList(); 
private final ScheduledThreadPoolExecutor    scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job 
private final ListeningExecutorService     threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for 
private ScheduledFuture scheduledFuture; 
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; 

public AlternativeExecutorService() 
{ 
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); 
} 

public void pushTask(OwnTask task) 
{ 
    ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable 
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end 
} 

public void shutdownInternalScheduledExecutor() 
{ 
    scheduledFuture.cancel(true); 
    scheduledExecutor.shutdownNow(); 
} 

long getCurrentMillisecondsTime() 
{ 
    return Calendar.getInstance().get(Calendar.MILLISECOND); 
} 

class ListenableFutureTask 
{ 
    private final ListenableFuture<Void> future; 
    private final OwnTask    task; 
    private final long     milliSecEndTime; 

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime) 
    { 
     this.future = future; 
     this.task = task; 
     this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); 
    } 

    ListenableFuture<Void> getFuture() 
    { 
     return future; 
    } 

    OwnTask getTask() 
    { 
     return task; 
    } 

    long getMilliSecEndTime() 
    { 
     return milliSecEndTime; 
    } 
} 

class TimeoutManagerJob implements Runnable 
{ 
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList() 
    { 
     return futureQueue; 
    } 

    @Override 
    public void run() 
    { 
     long currentMileSecValue = getCurrentMillisecondsTime(); 
     for (ListenableFutureTask futureTask : futureQueue) 
     { 
      consumeFuture(futureTask, currentMileSecValue); 
     } 
    } 

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) 
    { 
     ListenableFuture<Void> future = futureTask.getFuture(); 
     boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; 
     if (isTimeout) 
     { 
      if (!future.isDone()) 
      { 
       future.cancel(true); 
      } 
      futureQueue.remove(futureTask); 
     } 
    } 
} 

class OwnTask implements Callable<Void> 
{ 
    private long  timeoutDuration; 
    private TimeUnit timeUnit; 

    OwnTask(long timeoutDuration, TimeUnit timeUnit) 
    { 
     this.timeoutDuration = timeoutDuration; 
     this.timeUnit = timeUnit; 
    } 

    @Override 
    public Void call() throws Exception 
    { 
     // do logic 
     return null; 
    } 

    public long getTimeoutDuration() 
    { 
     return timeoutDuration; 
    } 

    public TimeUnit getTimeUnit() 
    { 
     return timeUnit; 
    } 
} 
} 
1

Dopo sacco di tempo per rilevare,
Infine, io uso invokeAll metodo ExecutorService per risolvere questo problema.
Ciò interromperà rigorosamente l'attività durante l'esecuzione dell'attività.
Ecco esempio

ExecutorService executorService = Executors.newCachedThreadPool(); 

try { 
    List<Callable<Object>> callables = new ArrayList<>(); 
    // Add your long time task (callable) 
    callables.add(new VaryLongTimeTask()); 
    // Assign tasks for specific execution timeout (e.g. 2 sec) 
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); 
    for (Future<Object> future : futures) { 
     // Getting result 
    } 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

executorService.shutdown(); 

Il pro è che si può anche presentare ListenableFuture allo stesso ExecutorService.
Basta modificare leggermente la prima riga di codice.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); 

ListeningExecutorService è la funzione di ascolto di ExecutorService al progetto di Google guava (com.google.guava))

+1

Grazie per aver indicato 'invokeAll'. Funziona molto bene. Solo una parola di cautela per chiunque pensi di usarlo: sebbene 'invokeAll' restituisca una lista di oggetti' Future', in realtà sembra essere un'operazione di blocco. – mxro

1

Utilizzando John W risposta ho creato un'implementazione che iniziano correttamente il timeout quando l'attività inizia il suo esecuzione. Ho anche scritto un test di unità per esso :)

Tuttavia, non soddisfa le mie esigenze in quanto alcune operazioni IO non interrompe quando Future.cancel() viene chiamato (cioè quando Thread.interrupted() si chiama).

Comunque se qualcuno è interessato, ho creato un gist: https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

Problemi correlati