Nota: queste implementazioni sono un po 'viziata e non deterministica. Si prega di leggere l'intera risposta e i commenti prima di utilizzare questo codice.
Come creare una coda di lavoro che rifiuta gli elementi mentre l'executor è al di sotto della dimensione massima del pool e inizia ad accettarli una volta raggiunto il massimo?
Questo si basa sul comportamento documentato:
"Se la richiesta non può essere messo in coda, un nuovo thread si crea a meno che ciò sarebbe superare maximumPoolSize, nel qual caso, il compito sarà respinto"
public class ExecutorTest
{
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int KEEP_ALIVE_TIME_MS = 5000;
public static void main(String[] args)
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue();
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 6; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread());
}
});
}
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private ThreadPoolExecutor executor;
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (executor.getPoolSize() < executor.getMaximumPoolSize())
{
return false;
}
return super.offer(e);
}
}
}
Nota: La tua domanda mi ha sorpreso perché mi aspettavo il tuo comportamento desiderato di essere il comportamento predefinito di un ThreadPoolExecutor configurato con un corePoolSize < maximumPoolSize. Ma come si fa notare, JavaDoc per ThreadPoolExecutor indica chiaramente altrimenti.
Idea # 2
penso di avere quello che è probabilmente un approccio leggermente migliore. Si basa sul comportamento dell'effetto collaterale codificato nel metodo setCorePoolSize
in ThreadPoolExecutor
. L'idea è di aumentare temporaneamente e condizionatamente la dimensione del pool principale quando un oggetto di lavoro viene accodato. Quando si aumenta la dimensione del pool di core, lo ThreadPoolExecutor
genererà immediatamente un numero sufficiente di nuovi thread per eseguire tutte le attività in coda (queue.size()). Quindi riduciamo immediatamente la dimensione del pool di core, che consente al pool di thread di ridursi naturalmente durante i periodi futuri di attività bassa. Questo approccio non è ancora completamente deterministico (è possibile che le dimensioni del pool crescano al di sopra della dimensione massima del pool, ad esempio), ma penso che in quasi tutti i casi sia migliore della prima strategia.
In particolare, ritengo questo approccio è migliore della prima perché:
- Sarà riutilizzare fili più spesso
- Non rifiuterà esecuzione come risultato di una gara
- Vorrei menziona ancora una volta che il primo approccio fa sì che il pool di thread cresca alla sua dimensione massima anche in condizioni di uso molto leggero. Questo approccio dovrebbe essere molto più efficiente a tale riguardo.
-
public class ExecutorTest2
{
private static final int KEEP_ALIVE_TIME_MS = 5000;
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
public static void main(String[] args) throws InterruptedException
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE);
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 60; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread()
+ " poolSize: " + executor.getPoolSize());
}
});
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private final int corePoolSize;
private final int maximumPoolSize;
private ThreadPoolExecutor executor;
public SaturateExecutorBlockingQueue(int corePoolSize,
int maximumPoolSize)
{
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
}
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (super.offer(e) == false)
{
return false;
}
// Uncomment one or both of the below lines to increase
// the likelyhood of the threadpool reusing an existing thread
// vs. spawning a new one.
//Thread.yield();
//Thread.sleep(0);
int currentPoolSize = executor.getPoolSize();
if (currentPoolSize < maximumPoolSize
&& currentPoolSize >= corePoolSize)
{
executor.setCorePoolSize(currentPoolSize + 1);
executor.setCorePoolSize(corePoolSize);
}
return true;
}
}
}
Eventuali duplicati: http://stackoverflow.com/questions/1800317/impossible-to-make-a-cached-thread-pool-with- a-size-limit – mnicky