2009-09-16 16 views
5

Esiste un modo per creare esecutore che avrà sempre almeno 5 filetti, e massimo di 20 thread e coda illimitata per i compiti (cioè non è un compito viene respinta)specificando problema ThreadPoolExecutor

ho provato nuovo ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) con tutte le possibilità che ho pensato di per la coda:

new LinkedBlockingQueue() // never runs more than 5 threads 
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new SynchronousQueue() // no tasks can wait, after 20, they are rejected 

e nessuno ha lavorato come voleva.

+0

Cosa vuoi dire che non ha funzionato? Un'eccezione? – Gandalf

+4

che è troppo vicino: Gandalf e Sarmun! – akf

+0

Gli Ent dovrebbero essere qui da un momento all'altro. . . e dov'è quella fastidiosa aquila ... – Gandalf

risposta

5

Forse qualcosa del genere potrebbe funzionare per voi? L'ho appena tirato su, quindi per favore colpiscilo. Fondamentalmente, implementa una piscina filo di overflow che viene utilizzato per alimentare il sottostante ThreadPoolExecutor

Esistono due principali spalle di disegnare vedo con esso:

  • La mancanza di un oggetto Future restituita alla submit(). Ma forse non è un problema per te.
  • La coda secondaria verrà svuotata solo nello ThreadPoolExecutor quando vengono inoltrati i lavori. Deve esserci una soluzione elegante, ma non la vedo ancora. Se sai che ci sarà un flusso costante di attività nello StusMagicExecutor allora questo potrebbe non essere un problema. ("Può" essere la parola chiave). Un'opzione potrebbe essere quella di fare in modo che le tue attività inviate vengano inoltrate allo StusMagicExecutor dopo il completamento?

di Stu Magia Esecutore:

public class StusMagicExecutor extends ThreadPoolExecutor { 
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE. 

    public StusMagicExecutor() { 
     super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler()); 
    } 
    public void queueRejectedTask(Runnable task) { 
     try { 
      secondaryQueue.put(task); 
     } catch (InterruptedException e) { 
      // do something 
     } 
    } 
    public Future submit(Runnable newTask) { 
     //drain secondary queue as rejection handler populates it 
     Collection<Runnable> tasks = new ArrayList<Runnable>(); 
     secondaryQueue.drainTo(tasks); 

     tasks.add(newTask); 

     for (Runnable task : tasks) 
      super.submit(task); 

     return null; //does not return a future! 
    } 
} 

class RejectionHandler implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 
     ((StusMagicExecutor)executor).queueRejectedTask(runnable); 
    } 
} 
+1

Fantastico, tnx, questo è praticamente quello che volevo. Con il wrapping di newTasks con runnable che esegue un task dopo l'ultimo, solo un thread su 20 potrebbe essere inattivo mentre abbiamo attività nella coda secondaria. – Sarmun

1

Le javadoc per ThreadPoolExecutor sono abbastanza chiare che una volta creati i thread corePoolSize, i nuovi thread verranno creati solo quando la coda è piena. Quindi se imposti core su 5 e max su 20, non otterrai mai il comportamento desiderato.

Tuttavia, se si imposta sia core e max su 20, le attività verranno aggiunte alla coda solo se tutti i 20 thread sono occupati. Ovviamente, ciò rende il requisito del "minimo 5 thread" un po 'insignificante, dal momento che tutti e 20 verranno mantenuti in vita (fino a quando non si spengono, comunque).

+0

"il minimo" non accadrà mai, a meno che tu non dica che tutti i fili interni possono morire. In ogni caso, non vedere il punto di entrambe le dimensioni principali e massime se non può essere utilizzato correttamente. Esiste un'altra classe (diversa da ThreadPoolExecutor) che soddisferà i miei requisiti? – Sarmun

1

Penso che questo problema è un difetto della classe e molto fuorviante data le combinazioni di parametri costruttore. Ecco una soluzione presa da ThreadPoolExecutor interno di SwingWorker che ho trasformato in una classe di alto livello. Non ha un minimo ma utilizza almeno un limite superiore. L'unica cosa che non so è quale colpo di prestazioni si ottiene dall'esecuzione del blocco.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { 
    private final ReentrantLock pauseLock = new ReentrantLock(); 
    private final Condition unpaused = pauseLock.newCondition(); 
    private boolean isPaused = false; 
    private final ReentrantLock executeLock = new ReentrantLock(); 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       handler); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, 
      RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory, handler); 
    } 

    @Override 
    public void execute(Runnable command) { 
     executeLock.lock(); 
     try { 
      pauseLock.lock(); 
      try { 
       isPaused = true; 
      } finally { 
       pauseLock.unlock(); 
      } 
      setCorePoolSize(getMaximumPoolSize()); 
      super.execute(command); 
      setCorePoolSize(0); 
      pauseLock.lock(); 
      try { 
       isPaused = false; 
       unpaused.signalAll(); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } finally { 
      executeLock.unlock(); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     pauseLock.lock(); 
     try { 
      while (isPaused) { 
       unpaused.await(); 
      } 
     } catch (InterruptedException ignore) { 

     } finally { 
      pauseLock.unlock(); 
     } 
    } 
} 
Problemi correlati