È assolutamente possibile. Chiamando setCorePoolSize(int)
si modifica la dimensione principale del pool. Le chiamate a questo metodo sono impostazioni thread-safe e override fornite al costruttore di ThreadPoolExecutor
. Se si taglia la dimensione del pool, i thread rimanenti si spegneranno una volta completata la coda di lavoro corrente (se sono inattivi, si spegneranno immediatamente). Se si aumenta la dimensione del pool, i nuovi thread verranno allocati il prima possibile. Il periodo di tempo per l'allocazione di nuovi thread non è documentato, ma nell'implementazione, l'allocazione di nuovi thread viene eseguita su ogni chiamata al metodo execute
.
Per associare questo con una farm di lavoro sintonizzabile in fase di runtime, è possibile esporre questa proprietà (in base al wrapper o utilizzando un esportatore MBean dinamico) come attributo JMX di lettura-scrittura per creare un elemento piuttosto piacevole, al volo elaboratore batch sintonizzabile.
Per ridurre forzatamente la dimensione del pool in fase di esecuzione (che è la richiesta), è necessario creare una sottoclasse di ThreadPoolExecutor
e aggiungere un'interruzione al metodo beforeExecute(Thread,Runnable)
. Interrompere il thread non è un'interruzione sufficiente, dal momento che solo interagisce con gli stati di attesa e durante l'elaborazione dei thread di attività ThreadPoolExecutor
non entra in uno stato interrompibile.
Recentemente ho avuto lo stesso problema nel tentativo di arrestare forzatamente un pool di thread prima che tutte le attività inoltrate vengano eseguite. Per farlo, ho interrotto il thread generando un'eccezione di runtime solo dopo aver sostituito lo UncaughtExceptionHandler
del thread con uno che si aspetta la mia specifica eccezione e lo scarta.
/**
* A runtime exception used to prematurely terminate threads in this pool.
*/
static class ShutdownException
extends RuntimeException {
ShutdownException (String message) {
super(message);
}
}
/**
* This uncaught exception handler is used only as threads are entered into
* their shutdown state.
*/
static class ShutdownHandler
implements UncaughtExceptionHandler {
private UncaughtExceptionHandler handler;
/**
* Create a new shutdown handler.
*
* @param handler The original handler to deligate non-shutdown
* exceptions to.
*/
ShutdownHandler (UncaughtExceptionHandler handler) {
this.handler = handler;
}
/**
* Quietly ignore {@link ShutdownException}.
* <p>
* Do nothing if this is a ShutdownException, this is just to prevent
* logging an uncaught exception which is expected. Otherwise forward
* it to the thread group handler (which may hand it off to the default
* uncaught exception handler).
* </p>
*/
public void uncaughtException (Thread thread, Throwable throwable) {
if (!(throwable instanceof ShutdownException)) {
/* Use the original exception handler if one is available,
* otherwise use the group exception handler.
*/
if (handler != null) {
handler.uncaughtException(thread, throwable);
}
}
}
}
/**
* Configure the given job as a spring bean.
*
* <p>Given a runnable task, configure it as a prototype spring bean,
* injecting any necessary dependencices.</p>
*
* @param thread The thread the task will be executed in.
* @param job The job to configure.
*
* @throws IllegalStateException if any error occurs.
*/
protected void beforeExecute (final Thread thread, final Runnable job) {
/* If we're in shutdown, it's because spring is in singleton shutdown
* mode. This means we must not attempt to configure the bean, but
* rather we must exit immediately (prematurely, even).
*/
if (!this.isShutdown()) {
if (factory == null) {
throw new IllegalStateException(
"This class must be instantiated by spring"
);
}
factory.configureBean(job, job.getClass().getName());
}
else {
/* If we are in shutdown mode, replace the job on the queue so the
* next process will see it and it won't get dropped. Further,
* interrupt this thread so it will no longer process jobs. This
* deviates from the existing behavior of shutdown().
*/
workQueue.add(job);
thread.setUncaughtExceptionHandler(
new ShutdownHandler(thread.getUncaughtExceptionHandler())
);
/* Throwing a runtime exception is the only way to prematurely
* cause a worker thread from the TheadPoolExecutor to exit.
*/
throw new ShutdownException("Terminating thread");
}
}
Nel tuo caso, si consiglia di creare un semaforo (solo per l'uso come un contatore threadsafe), che non ha permesso, e quando spegnere le discussioni rilasciare per un numero di permessi che corrisponde al delta del la precedente dimensione del pool principale e la nuova dimensione del pool (che richiede l'intervento del metodo setCorePoolSize(int)
). Questo ti permetterà di terminare i tuoi thread dopo che l'attività corrente è stata completata.
private Semaphore terminations = new Semaphore(0);
protected void beforeExecute (final Thread thread, final Runnable job) {
if (terminations.tryAcquire()) {
/* Replace this item in the queue so it may be executed by another
* thread
*/
queue.add(job);
thread.setUncaughtExceptionHandler(
new ShutdownHandler(thread.getUncaughtExceptionHandler())
);
/* Throwing a runtime exception is the only way to prematurely
* cause a worker thread from the TheadPoolExecutor to exit.
*/
throw new ShutdownException("Terminating thread");
}
}
public void setCorePoolSize (final int size) {
int delta = getActiveCount() - size;
super.setCorePoolSize(size);
if (delta > 0) {
terminations.release(delta);
}
}
Questo dovrebbe interrompere n filettature per f (n) = attivo - richiesta. Se c'è qualche problema, la strategia di allocazione dello ThreadPoolExecutor
s è abbastanza duratura. Il libro mantiene la cessazione anticipata utilizzando un blocco finally
che garantisce l'esecuzione. Per questo motivo, anche se si terminano troppi thread, questi si ripopoleranno.
Per gli elettori down. Se hai letto la domanda dell'OP, la seconda risposta altamente votata non ha originariamente risposto. L'OP dice tanto. Le modifiche successive implementano i suggerimenti in questa risposta. –