2012-03-08 15 views
6

Sto cercando un ThreadPoolExecutor dove posso impostare un corePoolSize e un maximumPoolSize e quello che succede è che la coda trasferirà immediatamente l'attività al pool di thread e quindi creerà nuovi thread fino a raggiungere lo maximumPoolSize quindi inizierà ad aggiungere a una coda .Strategia Java ThreadPoolExecutor, 'Handoff diretto' con la coda?

Esiste una cosa del genere? Se no, ci sono buone ragioni per cui non ha una tale strategia?

Ciò che voglio in sostanza è che le attività vengano inoltrate per l'esecuzione e quando raggiunge un punto in cui sostanzialmente ottiene prestazioni "peggiori" dall'avere troppi thread (impostando maximumPoolSize), si fermerebbe l'aggiunta di nuovi thread e lavorare con quel pool di thread e avviare l'accodamento, quindi se la coda è piena, rifiuta.

E quando il carico ritorna verso il basso, può iniziare a smontare i fili non utilizzati fino a corePoolSize.

Questo rende più senso per me nella mia richiesta di 'tre strategie generali' elencate http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html

+0

Eventuali duplicati: http://stackoverflow.com/questions/1800317/impossible-to-make-a-cached-thread-pool-with- a-size-limit – mnicky

risposta

3

Nota: queste implementazioni sono un po 'viziata e non deterministica. Si prega di leggere l'intera risposta e i commenti prima di utilizzare questo codice.

Come creare una coda di lavoro che rifiuta gli elementi mentre l'executor è al di sotto della dimensione massima del pool e inizia ad accettarli una volta raggiunto il massimo?

Questo si basa sul comportamento documentato:

"Se la richiesta non può essere messo in coda, un nuovo thread si crea a meno che ciò sarebbe superare maximumPoolSize, nel qual caso, il compito sarà respinto"

public class ExecutorTest 
{ 
    private static final int CORE_POOL_SIZE = 2; 
    private static final int MAXIMUM_POOL_SIZE = 4; 
    private static final int KEEP_ALIVE_TIME_MS = 5000; 

    public static void main(String[] args) 
    { 
     final SaturateExecutorBlockingQueue workQueue = 
      new SaturateExecutorBlockingQueue(); 

     final ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(CORE_POOL_SIZE, 
        MAXIMUM_POOL_SIZE, 
        KEEP_ALIVE_TIME_MS, 
        TimeUnit.MILLISECONDS, 
        workQueue); 

     workQueue.setExecutor(executor); 

     for (int i = 0; i < 6; i++) 
     { 
      final int index = i; 
      executor.submit(new Runnable() 
      { 
       public void run() 
       { 
        try 
        { 
         Thread.sleep(1000); 
        } 
        catch (InterruptedException e) 
        { 
         e.printStackTrace(); 
        } 

        System.out.println("Runnable " + index 
          + " on thread: " + Thread.currentThread()); 
       } 
      }); 
     } 
    } 

    public static class SaturateExecutorBlockingQueue 
     extends LinkedBlockingQueue<Runnable> 
    { 
     private ThreadPoolExecutor executor; 

     public void setExecutor(ThreadPoolExecutor executor) 
     { 
      this.executor = executor; 
     } 

     public boolean offer(Runnable e) 
     { 
      if (executor.getPoolSize() < executor.getMaximumPoolSize()) 
      { 
       return false; 
      } 
      return super.offer(e); 
     } 
    } 
} 

Nota: La tua domanda mi ha sorpreso perché mi aspettavo il tuo comportamento desiderato di essere il comportamento predefinito di un ThreadPoolExecutor configurato con un corePoolSize < maximumPoolSize. Ma come si fa notare, JavaDoc per ThreadPoolExecutor indica chiaramente altrimenti.


Idea # 2

penso di avere quello che è probabilmente un approccio leggermente migliore. Si basa sul comportamento dell'effetto collaterale codificato nel metodo setCorePoolSize in ThreadPoolExecutor. L'idea è di aumentare temporaneamente e condizionatamente la dimensione del pool principale quando un oggetto di lavoro viene accodato. Quando si aumenta la dimensione del pool di core, lo ThreadPoolExecutor genererà immediatamente un numero sufficiente di nuovi thread per eseguire tutte le attività in coda (queue.size()). Quindi riduciamo immediatamente la dimensione del pool di core, che consente al pool di thread di ridursi naturalmente durante i periodi futuri di attività bassa. Questo approccio non è ancora completamente deterministico (è possibile che le dimensioni del pool crescano al di sopra della dimensione massima del pool, ad esempio), ma penso che in quasi tutti i casi sia migliore della prima strategia.

In particolare, ritengo questo approccio è migliore della prima perché:

  1. Sarà riutilizzare fili più spesso
  2. Non rifiuterà esecuzione come risultato di una gara
  3. Vorrei menziona ancora una volta che il primo approccio fa sì che il pool di thread cresca alla sua dimensione massima anche in condizioni di uso molto leggero. Questo approccio dovrebbe essere molto più efficiente a tale riguardo.

-

public class ExecutorTest2 
{ 
    private static final int KEEP_ALIVE_TIME_MS = 5000; 
    private static final int CORE_POOL_SIZE = 2; 
    private static final int MAXIMUM_POOL_SIZE = 4; 

    public static void main(String[] args) throws InterruptedException 
    { 
     final SaturateExecutorBlockingQueue workQueue = 
      new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, 
        MAXIMUM_POOL_SIZE); 

     final ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(CORE_POOL_SIZE, 
        MAXIMUM_POOL_SIZE, 
        KEEP_ALIVE_TIME_MS, 
        TimeUnit.MILLISECONDS, 
        workQueue); 

     workQueue.setExecutor(executor); 

     for (int i = 0; i < 60; i++) 
     { 
      final int index = i; 
      executor.submit(new Runnable() 
      { 
       public void run() 
       { 
        try 
        { 
         Thread.sleep(1000); 
        } 
        catch (InterruptedException e) 
        { 
         e.printStackTrace(); 
        } 

        System.out.println("Runnable " + index 
          + " on thread: " + Thread.currentThread() 
          + " poolSize: " + executor.getPoolSize()); 
       } 
      }); 
     } 

     executor.shutdown(); 

     executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 
    } 

    public static class SaturateExecutorBlockingQueue 
     extends LinkedBlockingQueue<Runnable> 
    { 
     private final int corePoolSize; 
     private final int maximumPoolSize; 
     private ThreadPoolExecutor executor; 

     public SaturateExecutorBlockingQueue(int corePoolSize, 
       int maximumPoolSize) 
     { 
      this.corePoolSize = corePoolSize; 
      this.maximumPoolSize = maximumPoolSize; 
     } 

     public void setExecutor(ThreadPoolExecutor executor) 
     { 
      this.executor = executor; 
     } 

     public boolean offer(Runnable e) 
     { 
      if (super.offer(e) == false) 
      { 
       return false; 
      } 
      // Uncomment one or both of the below lines to increase 
      // the likelyhood of the threadpool reusing an existing thread 
      // vs. spawning a new one. 
      //Thread.yield(); 
      //Thread.sleep(0); 
      int currentPoolSize = executor.getPoolSize(); 
      if (currentPoolSize < maximumPoolSize 
        && currentPoolSize >= corePoolSize) 
      { 
       executor.setCorePoolSize(currentPoolSize + 1); 
       executor.setCorePoolSize(corePoolSize); 
      } 
      return true; 
     } 
    } 
} 
+0

Bene quello che hai suggerito è quello che ho detto nel commento nella risposta ora cancellata. Comunque. Penso che troverete un'offerta dominante potrebbe causare problemi in quanto non ha un blocco. Anche se sono solo 3 istruzioni? o così tanto tempo. –

+0

@RonaldChan Ero preoccupato per la stessa cosa, e ho cercato di ragionare sui potenziali problemi, ma non ho ancora colpito nessuno. Continuerò a cercare di pensarci. –

+0

Accetta comunque la tua risposta. Questo è ciò che ho implementato alla fine, piuttosto che provare a riscrivere il metodo di esecuzione di ThreadPoolExectuor che è molto più complicato. A proposito, aggiungo anche un buffer in maximumPoolSize per essere inferiore del 5% rispetto al reale per consentire le gare. Non è il metodo ideale, ma i comportamenti predefiniti di ThreadPoolExecutor sono dispari e non riesco a trovare alcuna implementazione di ThreadPools esterna che abbia il comportamento descritto. Se c'è, prendo in considerazione la possibilità di cambiare la risposta accettata. –

2

abbiamo trovato una soluzione a questo problema con il seguente codice:

Questa coda è un ibrido SynchronousQueue/LinkedBlockingQueue.

public class OverflowingSynchronousQueue<E> extends LinkedBlockingQueue<E> { 
    private static final long serialVersionUID = 1L; 

    private SynchronousQueue<E> synchronousQueue = new SynchronousQueue<E>(); 

    public OverflowingSynchronousQueue() { 
    super(); 
    } 

    public OverflowingSynchronousQueue(int capacity) { 
    super(capacity); 
    } 

    @Override 
    public boolean offer(E e) { 
    // Create a new thread or wake an idled thread 
    return synchronousQueue.offer(e); 
    } 

    public boolean offerToOverflowingQueue(E e) { 
    // Add to queue 
    return super.offer(e); 
    } 

    @Override 
    public E take() throws InterruptedException { 
    // Return tasks from queue, if any, without blocking 
    E task = super.poll(); 
    if (task != null) { 
     return task; 
    } else { 
     // Block on the SynchronousQueue take 
     return synchronousQueue.take(); 
    } 
    } 

    @Override 
    public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
    // Return tasks from queue, if any, without blocking 
    E task = super.poll(); 
    if (task != null) { 
     return task; 
    } else { 
     // Block on the SynchronousQueue poll 
     return synchronousQueue.poll(timeout, unit); 
    } 
    } 

}

Per farlo funzionare, abbiamo bisogno di avvolgere il RejectedExecutionHandler a chiamare "offerToOverflowingQueue" quando un'attività è respinta.

public class OverflowingRejectionPolicyAdapter implements RejectedExecutionHandler { 

    private OverflowingSynchronousQueue<Runnable> queue; 
    private RejectedExecutionHandler adaptedRejectedExecutionHandler; 

    public OverflowingRejectionPolicyAdapter(OverflowingSynchronousQueue<Runnable> queue, 
              RejectedExecutionHandler adaptedRejectedExecutionHandler) 
    { 
    super(); 
    this.queue = queue; 
    this.adaptedRejectedExecutionHandler = adaptedRejectedExecutionHandler; 
    } 

    @Override 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
    if (!queue.offerToOverflowingQueue(r)) { 
     adaptedRejectedExecutionHandler.rejectedExecution(r, executor); 
    } 
    } 
} 

Ecco come creiamo il ThreadPoolExecutor

public static ExecutorService newSaturatingThreadPool(int corePoolSize, 
                 int maxPoolSize, 
                 int maxQueueSize, 
                 long keepAliveTime, 
                 TimeUnit timeUnit, 
                 String threadNamePrefix, 
                 RejectedExecutionHandler rejectedExecutionHandler) 
    { 
    OverflowingSynchronousQueue<Runnable> queue = new OverflowingSynchronousQueue<Runnable>(maxQueueSize); 
    OverflowingRejectionPolicyAdapter rejectionPolicyAdapter = new OverflowingRejectionPolicyAdapter(queue, 
                            rejectedExecutionHandler); 
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, 
                 maxPoolSize, 
                 keepAliveTime, 
                 timeUnit, 
                 queue, 
                 new NamedThreadFactory(threadNamePrefix), 
                 rejectionPolicyAdapter); 
    return executor; 
} 
Problemi correlati