18

ho presentato mucchio di posti di lavoro a un ExecutorService in Java e in qualche modo desidera sospendere temporaneamente tutti questi posti di lavoro. Qual'è il miglior modo per farlo? Come posso riprendere? O sto sbagliando tutto questo? Devo seguire qualche altro schema per ciò che voglio raggiungere (cioè capacità di mettere in pausa/riprendere i servizi di esecuzione)?Come mettere in pausa/riprendere tutti i thread in un ExecutorService in Java?

+1

Vuoi dire impedire che nuovi posti di lavoro l'esecuzione, o mettere in pausa i lavori * già in esecuzione *? –

+0

Metti in pausa i lavori già in esecuzione. La pausa/ripresa potrebbe essere chiamata dopo 'shutdown' – pathikrit

+0

In tal caso, la modalità di avvio dei lavori è praticamente irrilevante. Avrai bisogno di codice per la pausa - ad esempio, ogni attività potrebbe voler controllare periodicamente un flag "dovrei mettere in pausa". Non sarà ancora istantaneo, ovviamente. –

risposta

13

Per rispondere alla mia domanda, ho trovato un esempio di PausableThreadPoolExecutor nelle javadocs di ThreadPoolExecutoritself. Ecco la mia versione utilizzando monitor di Guava:

import com.google.common.util.concurrent.Monitor; 
import java.util.concurrent.ScheduledThreadPoolExecutor; 
import java.util.concurrent.ThreadFactory; 

public class PausableExecutor extends ScheduledThreadPoolExecutor { 

    private boolean isPaused; 

    private final Monitor monitor = new Monitor(); 
    private final Monitor.Guard paused = new Monitor.Guard(monitor) { 
     @Override 
     public boolean isSatisfied() { 
      return isPaused; 
     } 
    }; 

    private final Monitor.Guard notPaused = new Monitor.Guard(monitor) { 
     @Override 
     public boolean isSatisfied() { 
      return !isPaused; 
     } 
    }; 

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) { 
     super(corePoolSize, threadFactory); 
    } 

    protected void beforeExecute(Thread t, Runnable r) { 
     super.beforeExecute(t, r); 
     monitor.enterWhenUninterruptibly(notPaused); 
     try { 
      monitor.waitForUninterruptibly(notPaused); 
     } finally { 
      monitor.leave(); 
     } 
    } 

    public void pause() { 
     monitor.enterIf(notPaused); 
     try { 
      isPaused = true; 
     } finally { 
      monitor.leave(); 
     } 
    } 

    public void resume() { 
     monitor.enterIf(paused); 
     try { 
      isPaused = false; 
     } finally { 
      monitor.leave(); 
     } 
    } 
} 
+2

ci sono alcune importanti differenze con la soluzione e l'esempio nelle javadocs ... (1) si Ho usato due 'Guard's in opposizione a una' Condizione' nelle javadoc; (2) hai usato 'enterIf' al di fuori di un if (che è semplicemente sbagliato); (3) Il 'leave' di' Monitor' usa 'signal' not' signalAll' (che è ciò che è veramente necessario qui); infine (4) perché aspettare su 'notPaused' se hai già inserito il' Monitor' basato su 'notPaused' (lascia semplicemente)? Tutto, non credo che Monitor è una buona scelta qui ... – Corin

+0

1) Trovo Monitor/Guardia astrazione più pulito di Guava di Stato. Solo prefetti personali qui. 2) Vuoi dire fuori prova invece di fuori se? Ho usato l'idioma documentato in Guava docs per Guard 3) Perché signalAll? Questo Esecutore riguarda solo solo i fili che esso contiene e non importa per loro se usiamo segnale o signalAll 4) Se si vede la documentazione Monitor - http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concorrente/Monitor.html - Google stesso consiglia di utilizzare per separare i monitor anche se uno è l'opposto booleano dell'altro. – pathikrit

4

Il problema è che il Runnable/richiamabile si deve verificare quando per mettere in pausa/riprendere. Detto questo, ci sono molti modi per farlo, e dipende dalle tue esigenze sul modo migliore per farlo. Qualunque sia la soluzione necessaria per rendere l'attesa interrompibile, in modo che la filettatura possa essere arrestata in modo pulito.

7

ho fatto alcune critiche sulla vostra risposta accettata, ma non erano molto costruttivo ... Quindi, ecco la mia soluzione. Vorrei usare una classe come questa e poi chiamare checkIn ovunque/ogni volta che voglio funzionalità di pausa. Trovalo su GitHub!

import java.util.Date; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.ReentrantLock; 

/** 
* Provides a mechanism to pause multiple threads. 
* If wish your thread to participate, then it must regularly check in with an instance of this object. 
* 
* @author Corin Lawson <[email protected]> 
*/ 
public class Continue { 
    private boolean isPaused; 
    private ReentrantLock pauseLock = new ReentrantLock(); 
    private Condition unpaused = pauseLock.newCondition(); 

    public void checkIn() throws InterruptedException { 
     if (isPaused) { 
      pauseLock.lock(); 
      try { 
       while (isPaused) 
        unpaused.await(); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } 
    } 

    public void checkInUntil(Date deadline) throws InterruptedException { 
     if (isPaused) { 
      pauseLock.lock(); 
      try { 
       while (isPaused) 
        unpaused.awaitUntil(deadline); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } 
    } 

    public void checkIn(long nanosTimeout) throws InterruptedException { 
     if (isPaused) { 
      pauseLock.lock(); 
      try { 
       while (isPaused) 
        unpaused.awaitNanos(nanosTimeout); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } 
    } 

    public void checkIn(long time, TimeUnit unit) throws InterruptedException { 
     if (isPaused) { 
      pauseLock.lock(); 
      try { 
       while (isPaused) 
        unpaused.await(time, unit); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } 
    } 

    public void checkInUninterruptibly() { 
     if (isPaused) { 
      pauseLock.lock(); 
      try { 
       while (isPaused) 
        unpaused.awaitUninterruptibly(); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } 
    } 

    public boolean isPaused() { 
     return isPaused; 
    } 

    public void pause() { 
     pauseLock.lock(); 
     try { 
      isPaused = true; 
     } finally { 
      pauseLock.unlock(); 
     } 
    } 

    public void resume() { 
     pauseLock.lock(); 
     try { 
      if (isPaused) { 
       isPaused = false; 
       unpaused.signalAll(); 
      } 
     } finally { 
      pauseLock.unlock(); 
     } 
    } 
} 

Ad esempio:

import java.util.concurrent.ScheduledThreadPoolExecutor; 
import java.util.concurrent.ThreadFactory; 

public class PausableExecutor extends ScheduledThreadPoolExecutor { 
    private Continue cont; 

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) { 
     super(corePoolSize, threadFactory); 
     cont = c; 
    } 

    protected void beforeExecute(Thread t, Runnable r) { 
     cont.checkIn(); 
     super.beforeExecute(t, r); 
    } 
} 

Questo ha il vantaggio che è possibile mettere in pausa molti fili con una singola chiamata a Continue s' pause.

+1

Grazie, ho appena usato il tuo esempio per implementare questa funzionalità, ma ho un paio di commenti, prima cheExecute catturi InterruptedException per compilare. Non è chiaro che non hai bisogno di sottoclasse ScheduledThreadPoolExecutor, puoi semplicemente usare ThreadPoolExecutor che è quello che stavo usando. PausableExcecutor interrompe solo l'esecuzione delle attività che sono state inviate ma non avviate, per mettere in pausa le attività già avviate è necessario chiamare checkIn nel codice attività stesso, ho usato checkInInterruptably() per questo ma non sono sicuro se sia una buona idea. –

+0

grazie per la condivisione - il primo di molti approcci che ho provato ha funzionato. –

+0

Dovrebbe 'boolean isPaused' essere volatile? Oppure il 'ReentrantLock' agisce come una barriera di memoria? Sto pensando ad es. thread Una chiamata 'pause()' o 'resume()', thread B chiama 'checkIn()', e thread C chiama 'isPaused()'. – Barn

4

ero alla ricerca di funzionalità di pausa/riprendere in esecutore, ma con l'aggiunta di capacità di attendere per qualsiasi attività in corso di elaborazione. Di seguito è la variante di altre grandi implementazioni di questo SO con l'aggiunta di funzioni di attesa. Lo stavo testando su executor con thread singolo. Così uso di base è:

executor.pause(); 
executor.await(10000); // blocks till current tasks processing ends 

codice della classe:

import java.util.concurrent.ScheduledThreadPoolExecutor; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.ReentrantLock; 

public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {  
    public boolean isPaused; 
    private ReentrantLock pauseLock = new ReentrantLock(); 
    private Condition unpaused = pauseLock.newCondition(); 
    private Latch activeTasksLatch = new Latch(); 

    private class Latch { 
    private final Object synchObj = new Object(); 
    private int count; 

    public boolean awaitZero(long waitMS) throws InterruptedException { 
     long startTime = System.currentTimeMillis(); 
     synchronized (synchObj) { 
     while (count > 0) { 
      if (waitMS != 0) { 
      synchObj.wait(waitMS); 
      long curTime = System.currentTimeMillis(); 
      if ((curTime - startTime) > waitMS) {     
       return count <= 0; 
      } 
      } 
      else 
      synchObj.wait(); 
     } 
     return count <= 0; 
     } 
    } 
    public void countDown() { 
     synchronized (synchObj) { 
     if (--count <= 0) { 
      // assert count >= 0;    
      synchObj.notifyAll(); 
     } 
     } 
    } 
    public void countUp() { 
     synchronized (synchObj) { 
     count++; 
     } 
    }  
    } 

    /** 
    * Default constructor for a simple fixed threadpool 
    */ 
    public PausableScheduledThreadPoolExecutor(int corePoolSize) { 
    super(corePoolSize); 
    } 

    /** 
    * Executed before a task is assigned to a thread. 
    */ 
    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 
    pauseLock.lock(); 
    try { 
     while (isPaused) 
     unpaused.await(); 
    } catch (InterruptedException ie) { 
     t.interrupt(); 
    } finally { 
     pauseLock.unlock(); 
    } 

    activeTasksLatch.countUp(); 
    super.beforeExecute(t, r); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
    try { 
     super.afterExecute(r, t); 
    } 
    finally { 
     activeTasksLatch.countDown(); 
    } 
    } 

    /** 
    * Pause the threadpool. Running tasks will continue running, but new tasks 
    * will not start untill the threadpool is resumed. 
    */ 
    public void pause() { 
    pauseLock.lock(); 
    try { 
     isPaused = true; 
    } finally { 
     pauseLock.unlock(); 
    } 
    } 

    /** 
    * Wait for all active tasks to end. 
    */ 
    public boolean await(long timeoutMS) { 
    // assert isPaused; 
    try { 
     return activeTasksLatch.awaitZero(timeoutMS); 
    } catch (InterruptedException e) { 
     // log e, or rethrow maybe 
    } 
    return false; 
    } 

    /** 
    * Resume the threadpool. 
    */ 
    public void resume() { 
    pauseLock.lock(); 
    try { 
     isPaused = false; 
     unpaused.signalAll(); 
    } finally { 
     pauseLock.unlock(); 
    } 
    } 

} 
+0

Questo sembra buono. Tu o qualcuno l'avete testato più a fondo? Ci sono revisioni o correzioni? Lo userò ora, perché questo non introduce ancora un'altra libreria. – Mike

+0

È in uso in applicazioni abbastanza grandi, nessun problema finora. Se ci sono errori in questo codice, sono anche disposto a sentire – marcinj

+0

@marcinj Sto provando il tuo codice executor. Funziona bene per mettere in pausa e riprendere. Ma noto che quando chiamo shutDownNow() su questo quando è in pausa, riprende e esegue alcune attività prima che sia effettivamente arrestato. Un modo per impedirlo? – ProgrAmmar

Problemi correlati