2010-08-05 13 views
13

Sto tentando di utilizzare un ThreadPoolExecutor per pianificare le attività, ma si verificano alcuni problemi con le sue politiche. Ecco il suo comportamento dichiarato:Politica ThreadPoolExecutor

  1. Se meno di discussioni corePoolSize sono in esecuzione, l'Esecutore sempre preferisce l'aggiunta di un nuovo thread, piuttosto che fare la coda.
  2. Se corePoolSize o più thread sono in esecuzione, l'Executor preferisce sempre mettere in coda una richiesta piuttosto che aggiungere un nuovo thread.
  3. Se una richiesta non può essere accodata, viene creato un nuovo thread a meno che questo non superi maximumPoolSize, nel qual caso l'attività verrà rifiutata.

Il comportamento voglio è questo:

  1. come sopra
  2. Se più di corePoolSize ma inferiore discussioni maximumPoolSize sono in esecuzione, preferisce l'aggiunta di un nuovo filo sopra accodamento, e utilizzando un thread inattivo oltre l'aggiunta di una nuova discussione.
  3. come sopra

Fondamentalmente io non voglio alcuna attività da respingere; Voglio che vengano accodati in una coda illimitata. Ma voglio avere fino a massimo threadPoolSize. Se utilizzo una coda illimitata, non genera mai thread dopo aver colpito coreSize. Se utilizzo una coda limitata, rifiuta le attività. C'è un modo per aggirare questo?

Quello a cui sto pensando ora è eseguire ThreadPoolExecutor su un SynchronousQueue, ma non nutrire attività direttamente su di esso, ma alimentarlo a un separato LinkedBlockingQueue non limitato. Quindi un altro thread si alimenta da LinkedBlockingQueue in Executor e, se uno viene rifiutato, riprova semplicemente fino a quando non viene rifiutato. Questo sembra un dolore e un po 'un trucco, però - c'è un modo più pulito per farlo?

risposta

1

Il vostro caso d'uso è comune, completamente legittimo e purtroppo più difficile di quanto ci si aspetterebbe. Per informazioni di base puoi leggere this discussion e trovare un puntatore a una soluzione (citata anche nella discussione) here. La soluzione di Shay funziona bene.

Generalmente sarei un po 'cauto sulle code illimitate; di solito è meglio avere un esplicito controllo del flusso in ingresso che degrada con grazia e regola il rapporto tra il lavoro corrente/rimanente per non sovraccaricare né il produttore né il consumatore.

1

Basta impostare corePoolsize = maximumPoolSize e utilizzare una coda illimitata?

Nella lista dei punti, 1 esclude 2, poiché corePoolSize sarà sempre inferiore o uguale a maximumPoolSize.

Modifica

C'è ancora qualcosa di incompatibile tra ciò che si vuole e ciò TPE vi offrirà.

Se si dispone di una coda illimitata, maximumPoolSize viene ignorata quindi, come è stato osservato, non verranno mai creati e utilizzati più di corePoolSize thread.

Quindi, ancora una volta, se si prende corePoolsize = maximumPoolSize con una coda illimitata, si ha ciò che si desidera, no?

+0

Oops, quello che ho scritto non era esattamente quello che volevo. Ho modificato l'originale. –

+0

Impostazione corePoolsize = maximumPoolSize è effettivamente chiuso, ma sto anche usando allowCoreThreadTimeOut (false) e prestartAllCoreThreads(). –

4

probabilmente non è necessario micro-gestire il pool di thread come richiesto.

Un pool di thread memorizzato nella cache riutilizzerà i thread inattivi consentendo anche thread concorrenti potenzialmente illimitati.Questo, naturalmente, potrebbe portare a prestazioni di fuga che si degradano dal sovraccarico di commutazione del contesto durante periodi di raffica.

Executors.newCachedThreadPool(); 

Una soluzione migliore è quella di porre un limite al numero totale di fili scartando la nozione di assicurare thread inattivi vengono utilizzati prima. Le modifiche di configurazione sarebbero:

corePoolSize = maximumPoolSize = N; 
allowCoreThreadTimeOut(true); 
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS); 

Ragionando su questo scenario, se l'esecutore ha meno di corePoolSize discussioni, che non deve essere molto occupato. Se il sistema non è molto occupato, non c'è molto danno nel girare un nuovo thread. In questo modo, il tuo ThreadPoolExecutor creerà sempre un nuovo lavoratore se è sotto il numero massimo di lavoratori consentiti. Solo quando il numero massimo di lavoratori sta "scappando", i lavoratori che aspettano inattivi per le attività ricevono compiti. Se un lavoratore attende aReasonableTimeDuration senza un'attività, può terminare. Usando limiti ragionevoli per le dimensioni del pool (dopotutto, ci sono solo così tante CPU) e un timeout ragionevolmente grande (per evitare che i thread si interrompano inutilmente), probabilmente si vedranno i benefici desiderati.

L'opzione finale è hacker. Fondamentalmente, lo ThreadPoolExecutor utilizza internamente BlockingQueue.offer per determinare se la coda ha capacità. Un'implementazione personalizzata di BlockingQueue potrebbe sempre rifiutare il tentativo offer. Quando lo ThreadPoolExecutor non riesce a eseguire l'operazione nella coda, prova a creare un nuovo operatore. Se non è possibile creare un nuovo lavoratore, verrà chiamato uno . A quel punto, un utente personalizzato RejectedExecutionHandler potrebbe forzare un put nell'usuale BlockingQueue.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */ 
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler { 
    BlockingQueue<T> delegate; 

    public boolean offer(T item) { 
     return false; 
    } 

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     delegate.put(r); 
    } 

    //.... delegate methods 
} 
Problemi correlati