2010-02-11 9 views
22

Sto utilizzando ExecutorService per la facilità del programma multithreading simultaneo. Prendere seguente codice:ExecutorService, metodo standard per evitare che la coda di attività si riempia troppo

 

while(xxx) 
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
... 
Future<..> ... = exService.submit(..); 
... 
} 
 

Nel mio caso il problema è che presentare() non è il blocco se tutti NUMBER_THREADS sono occupati. La conseguenza è che la coda delle attività è inondata da molte attività. La conseguenza di ciò è che la chiusura del servizio di esecuzione con ExecutorService.shutdown() richiede anni (ExecutorService.isTerminated() sarà false per un lungo periodo di tempo). Il motivo è che la coda delle attività è ancora abbastanza piena.

Per ora la mia soluzione è quella di lavorare con i semafori per non consentire di avere a molte voci dentro la coda compito di ExecutorService:

 

... 
Semaphore semaphore=new Semaphore(NUMBER_THREADS); 

while(xxx) 
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
... 
semaphore.aquire(); 
// internally the task calls a finish callback, which invokes semaphore.release() 
// -> now another task is added to queue 
Future<..> ... = exService.submit(..); 
... 
} 
 

Sono sicuro che ci sia una soluzione migliore più Encapsulated?

risposta

4

È meglio creare autonomamente lo ThreadPoolExecutor (che è Esecutors.newXXX() in ogni caso).

Nel costruttore, è possibile passare un BlockingQueue per l'Executor da utilizzare come coda delle attività. Se si passa in una dimensione vincolata BlockingQueue (come LinkedBlockingQueue), dovrebbe ottenere l'effetto desiderato.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize)); 
+0

vedo. Ho trascurato un dettaglio su http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29. eccolo menzionato, che come standard viene usata una coda illimitata. –

+6

l'ho provato. sfortunatamente la tua soluzione non blocca (come voglio) ma lancia una RejectedExecutionException. trovato anche: http://www.velocityreviews.com/forums/t389526-threadpoolexecutor-with-blocking-execute.html. i workaround presentati sembrano essere più complicati come esempio del mio semaforo, dannazione! –

+3

questo non funziona a causa di RejectedExecutionException se la coda è piena – user249654

5

È possibile ThreadPoolExecutor.getQueue(). Size() per scoprire la dimensione della coda di attesa. È possibile eseguire un'azione se la coda è troppo lunga. Suggerisco di eseguire l'attività nel thread corrente se la coda è troppo lunga per rallentare il produttore (se appropriato)

4

Un vero threading ThreadPoolExecutor è stato nella lista dei desideri di molti, c'è persino un bug JDC aperto su di esso . Sto affrontando lo stesso problema, e mi sono imbattuto in questo modo: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

E 'l'implementazione di un BlockingThreadPoolExecutor, implementato usando un RejectionPolicy che utilizza l'offerta per aggiungere il compito di coda, in attesa che la coda per avere spazio. Sembra buona.

+0

[Questa risposta] (http://stackoverflow.com/a/4522411/394431) a un'altra domanda suggerisce l'uso di una sottoclasse 'BlockingQueue' personalizzata che blocca su' offerta() 'delegando a' put() '. Penso che finisca per funzionare più o meno lo stesso di "RejectedExecutionHandler". –

1

è possibile aggiungere un'altra coda di bloqueing che ha dimensioni limitate per il controllo della dimensione della coda interna in executorService, alcuni pensano come il semaforo ma molto semplice. prima dell'esecutore che metti() e mentre l'operazione esegue achive take(). prendere() deve essere all'interno del codice compito

21

Il trucco è quello di utilizzare una dimensione della coda fisse e:

new ThreadPoolExecutor.CallerRunsPolicy() 

mi consiglia anche di utilizzare Guava di ListeningExecutorService. Ecco un esempio di coda consumatore/produttore.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); 
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); 

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 
            5000L, TimeUnit.MILLISECONDS, 
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); 
} 

Qualsiasi cosa è meglio e si potrebbe prendere in considerazione un MQ come RabbitMQ o ActiveMQ come hanno la tecnologia QoS.

+0

Ottima risposta, con un esempio completo. –

Problemi correlati