2010-05-07 19 views
14

Sto lavorando con un java.util.concurrent.ThreadPoolExecutor per elaborare un numero di elementi in parallelo. Anche se il threading funziona correttamente, a volte abbiamo incontrato altri vincoli di risorse a causa di azioni che si verificano nei thread, il che ci ha portato a selezionare il numero di thread nel pool.Ridimensionamento dinamico di java.util.concurrent.ThreadPoolExecutor mentre è in attesa di attività

Mi piacerebbe sapere se c'è un modo per azzerare il numero dei thread mentre i thread stanno effettivamente lavorando. So che è possibile chiamare setMaximumPoolSize() e/o setCorePoolSize(), ma questi ridimensionano il pool solo una volta che i thread diventano inattivi, ma non diventano inattivi finché non ci sono attività in attesa in coda.

risposta

4

Per quanto posso dire, questo non è possibile in un bel modo pulito.

È possibile implementare il metodo beforeExecute per verificare alcuni valori booleani e forzare l'interruzione temporanea dei thread. Tieni presente che conterranno un'attività che non verrà eseguita finché non verranno riattivati.

In alternativa, è possibile implementare afterExecute per generare un RuntimeException quando si è saturi. Ciò causerà effettivamente la morte del Thread e poiché l'Executor sarà sopra il massimo, non ne verrebbe creato uno nuovo.

Non ti consiglio di farlo. Invece, prova a trovare un altro modo per controllare l'esecuzione concomitante delle attività che ti stanno causando un problema. Possibilmente eseguendoli in un pool di thread separato con un numero più limitato di lavoratori.

+0

Per gli elettori down. Se hai letto la domanda dell'OP, la seconda risposta altamente votata non ha originariamente risposto. L'OP dice tanto. Le modifiche successive implementano i suggerimenti in questa risposta. –

0

Ho letto la documentazione di setMaximumPoolSize() e setCorePoolSize() e sembra che possano produrre il comportamento necessario.

- EDIT -

mia conclusione era sbagliata: si veda la discussione sotto per i dettagli ...

+0

I lavoratori non uscirà immediatamente. "Se il nuovo valore è inferiore al valore corrente, i thread esistenti in eccesso verranno terminati quando diventano inattivi." Finché ci sono compiti nell'esecutore, o entrano nell'esecutore in modo tempestivo, gli operai non terminano. –

+0

@ Tim: sì, questo è quello che sta dicendo Eyal. L'OP vuole che i suoi thread esistenti siano terminati senza riguardo? Quello non sarebbe qualcosa che un esecutore potrebbe fare anche se volesse.L'OP potrebbe impostare un booleano su alcune attività per dire loro di terminare presto, e sarebbe probabilmente una buona idea inviare un interrupt quando è impostato il valore booleano. Ma questa è la logica a livello di app - non qualcosa che un esecutore potrebbe sfuggire di mano. –

+0

Un'altra idea casuale sarebbe se l'Esecutore potesse limitare un certo numero di thread riducendo la loro priorità. Ciò potrebbe dare lo stesso effetto complessivo. Questa è solo un'idea casuale - potrebbe avere enormi difetti a cui non sto pensando. –

32

È assolutamente possibile. Chiamando setCorePoolSize(int) si modifica la dimensione principale del pool. Le chiamate a questo metodo sono impostazioni thread-safe e override fornite al costruttore di ThreadPoolExecutor. Se si taglia la dimensione del pool, i thread rimanenti si spegneranno una volta completata la coda di lavoro corrente (se sono inattivi, si spegneranno immediatamente). Se si aumenta la dimensione del pool, i nuovi thread verranno allocati il ​​prima possibile. Il periodo di tempo per l'allocazione di nuovi thread non è documentato, ma nell'implementazione, l'allocazione di nuovi thread viene eseguita su ogni chiamata al metodo execute.

Per associare questo con una farm di lavoro sintonizzabile in fase di runtime, è possibile esporre questa proprietà (in base al wrapper o utilizzando un esportatore MBean dinamico) come attributo JMX di lettura-scrittura per creare un elemento piuttosto piacevole, al volo elaboratore batch sintonizzabile.

Per ridurre forzatamente la dimensione del pool in fase di esecuzione (che è la richiesta), è necessario creare una sottoclasse di ThreadPoolExecutor e aggiungere un'interruzione al metodo beforeExecute(Thread,Runnable). Interrompere il thread non è un'interruzione sufficiente, dal momento che solo interagisce con gli stati di attesa e durante l'elaborazione dei thread di attività ThreadPoolExecutor non entra in uno stato interrompibile.

Recentemente ho avuto lo stesso problema nel tentativo di arrestare forzatamente un pool di thread prima che tutte le attività inoltrate vengano eseguite. Per farlo, ho interrotto il thread generando un'eccezione di runtime solo dopo aver sostituito lo UncaughtExceptionHandler del thread con uno che si aspetta la mia specifica eccezione e lo scarta.

/** 
* A runtime exception used to prematurely terminate threads in this pool. 
*/ 
static class ShutdownException 
extends RuntimeException { 
    ShutdownException (String message) { 
     super(message); 
    } 
} 

/** 
* This uncaught exception handler is used only as threads are entered into 
* their shutdown state. 
*/ 
static class ShutdownHandler 
implements UncaughtExceptionHandler { 
    private UncaughtExceptionHandler handler; 

    /** 
    * Create a new shutdown handler. 
    * 
    * @param handler The original handler to deligate non-shutdown 
    * exceptions to. 
    */ 
    ShutdownHandler (UncaughtExceptionHandler handler) { 
     this.handler = handler; 
    } 
    /** 
    * Quietly ignore {@link ShutdownException}. 
    * <p> 
    * Do nothing if this is a ShutdownException, this is just to prevent 
    * logging an uncaught exception which is expected. Otherwise forward 
    * it to the thread group handler (which may hand it off to the default 
    * uncaught exception handler). 
    * </p> 
    */ 
    public void uncaughtException (Thread thread, Throwable throwable) { 
     if (!(throwable instanceof ShutdownException)) { 
      /* Use the original exception handler if one is available, 
      * otherwise use the group exception handler. 
      */ 
      if (handler != null) { 
       handler.uncaughtException(thread, throwable); 
      } 
     } 
    } 
} 
/** 
* Configure the given job as a spring bean. 
* 
* <p>Given a runnable task, configure it as a prototype spring bean, 
* injecting any necessary dependencices.</p> 
* 
* @param thread The thread the task will be executed in. 
* @param job The job to configure. 
* 
* @throws IllegalStateException if any error occurs. 
*/ 
protected void beforeExecute (final Thread thread, final Runnable job) { 
    /* If we're in shutdown, it's because spring is in singleton shutdown 
    * mode. This means we must not attempt to configure the bean, but 
    * rather we must exit immediately (prematurely, even). 
    */ 
    if (!this.isShutdown()) { 
     if (factory == null) { 
      throw new IllegalStateException(
       "This class must be instantiated by spring" 
       ); 
     } 

     factory.configureBean(job, job.getClass().getName()); 
    } 
    else { 
     /* If we are in shutdown mode, replace the job on the queue so the 
     * next process will see it and it won't get dropped. Further, 
     * interrupt this thread so it will no longer process jobs. This 
     * deviates from the existing behavior of shutdown(). 
     */ 
     workQueue.add(job); 

     thread.setUncaughtExceptionHandler(
      new ShutdownHandler(thread.getUncaughtExceptionHandler()) 
      ); 

     /* Throwing a runtime exception is the only way to prematurely 
     * cause a worker thread from the TheadPoolExecutor to exit. 
     */ 
     throw new ShutdownException("Terminating thread"); 
    } 
} 

Nel tuo caso, si consiglia di creare un semaforo (solo per l'uso come un contatore threadsafe), che non ha permesso, e quando spegnere le discussioni rilasciare per un numero di permessi che corrisponde al delta del la precedente dimensione del pool principale e la nuova dimensione del pool (che richiede l'intervento del metodo setCorePoolSize(int)). Questo ti permetterà di terminare i tuoi thread dopo che l'attività corrente è stata completata.

private Semaphore terminations = new Semaphore(0); 

protected void beforeExecute (final Thread thread, final Runnable job) { 
    if (terminations.tryAcquire()) { 
     /* Replace this item in the queue so it may be executed by another 
     * thread 
     */ 
     queue.add(job); 

     thread.setUncaughtExceptionHandler(
      new ShutdownHandler(thread.getUncaughtExceptionHandler()) 
      ); 

     /* Throwing a runtime exception is the only way to prematurely 
     * cause a worker thread from the TheadPoolExecutor to exit. 
     */ 
     throw new ShutdownException("Terminating thread"); 
    } 
} 

public void setCorePoolSize (final int size) { 
    int delta = getActiveCount() - size; 

    super.setCorePoolSize(size); 

    if (delta > 0) { 
     terminations.release(delta); 
    } 
} 

Questo dovrebbe interrompere n filettature per f (n) = attivo - richiesta. Se c'è qualche problema, la strategia di allocazione dello ThreadPoolExecutor s è abbastanza duratura. Il libro mantiene la cessazione anticipata utilizzando un blocco finally che garantisce l'esecuzione. Per questo motivo, anche se si terminano troppi thread, questi si ripopoleranno.

+1

Sì, so che è possibile apportare la modifica e averla effetto una volta che i thread sono inattivi. Tuttavia quello che ho trovato è che essere "inattivo" non si verifica quando un thread completa la sua attività, a meno che non ci siano altri compiti in attesa. Stavo chiedendo se ci fosse un modo per fare in modo che le modifiche diventassero effettive, così che anche se ci sono attività in attesa, saranno gestite dal nuovo numero desiderato di thread (come nel mio esempio, dove volevamo immediatamente comporre il numero quantità di risorse concorrenti utilizzate). –

+0

Ho aggiornato il mio post per descrivere una soluzione derivata da un problema che ho avuto di recente e di natura simile. Per favore, dai un'occhiata. –

+0

Non capisco, perché ottengo zero karma per questa risposta? :-( –

6

La soluzione consiste nel drenare la coda ThreadPoolExecutor, impostare la dimensione ThreadPoolExecutor in base alle esigenze e quindi aggiungere nuovamente i thread, uno per uno, non appena terminano gli altri. Il metodo per drenare la coda nella classe ThreadPoolExecutor è privato, quindi è necessario crearlo da solo. Ecco il codice:

/** 
* Drains the task queue into a new list. Used by shutdownNow. 
* Call only while holding main lock. 
*/ 
public static List<Runnable> drainQueue() { 
    List<Runnable> taskList = new ArrayList<Runnable>(); 
    BlockingQueue<Runnable> workQueue = executor.getQueue(); 
    workQueue.drainTo(taskList); 
    /* 
    * If the queue is a DelayQueue or any other kind of queue 
    * for which poll or drainTo may fail to remove some elements, 
    * we need to manually traverse and remove remaining tasks. 
    * To guarantee atomicity wrt other threads using this queue, 
    * we need to create a new iterator for each element removed. 
    */ 
    while (!workQueue.isEmpty()) { 
     Iterator<Runnable> it = workQueue.iterator(); 
     try { 
      if (it.hasNext()) { 
       Runnable r = it.next(); 
       if (workQueue.remove(r)) 
        taskList.add(r); 
      } 
     } catch (ConcurrentModificationException ignore) { 
     } 
    } 
    return taskList; 
} 

Prima di chiamare questo metodo è necessario ottenere e quindi rilasciare il blocco principale. Per fare ciò è necessario utilizzare java reflection perché il campo "mainLock" è privato. Ancora una volta, ecco il codice:

private Field getMainLock() throws NoSuchFieldException { 
    Field mainLock = executor.getClass().getDeclaredField("mainLock"); 
    mainLock.setAccessible(true); 
    return mainLock; 
} 

Dove "esecutore" è il vostro ThreadPoolExecutor.

Ora è necessario bloccare/sbloccare metodi:

public void lock() { 
    try { 
     Field mainLock = getMainLock(); 
     Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null); 
     lock.invoke(mainLock.get(executor), (Object[])null); 
    } catch { 
     ... 
    } 
} 

public void unlock() { 
    try { 
     Field mainLock = getMainLock(); 
     mainLock.setAccessible(true); 
     Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null); 
     lock.invoke(mainLock.get(executor), (Object[])null); 
    } catch { 
     ... 
    } 
} 

Infine si può scrivere il metodo del "setThreadsNumber", e funzionerà sia aumentando e diminuendo la dimensione ThreadPoolExecutor:

public void setThreadsNumber(int intValue) { 
    boolean increasing = intValue > executor.getPoolSize(); 
    executor.setCorePoolSize(intValue); 
    executor.setMaximumPoolSize(intValue); 
    if(increasing){ 
     if(drainedQueue != null && (drainedQueue.size() > 0)){ 
      executor.submit(drainedQueue.remove(0)); 
     } 
    } else { 
     if(drainedQueue == null){ 
      lock(); 
      drainedQueue = drainQueue(); 
      unlock(); 
     } 
    } 
} 

Nota: ovviamente se si eseguono N thread paralleli e si cambia questo numero in N-1, tutti i thread N continueranno a funzionare. Quando finisce il primo thread, non verranno eseguiti nuovi thread. D'ora in poi il numero di thread parallelo sarà quello che hai scelto.

0

Avevo bisogno anche della stessa soluzione, e sembra che in JDK8 setCorePoolSize() e setMaximumPoolSize() producano effettivamente il risultato desiderato. Ho fatto un test in cui invio 4 compiti al pool e loro eseguono in modo conciso, ridimensiono le dimensioni del pool mentre sono in esecuzione e invio ancora un altro eseguibile che voglio essere solitario. Quindi ripristino la piscina alla sua dimensione originale. Qui è la sorgente di prova https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

produce il seguente output (filo "50" è quella che deve essere eseguito in isolamento)

run: 
test thread 2 enter 
test thread 1 enter 
test thread 3 enter 
test thread 4 enter 
test thread 1 exit 
test thread 2 exit 
test thread 3 exit 
test thread 4 exit 
test thread 50 enter 
test thread 50 exit 
test thread 1 enter 
test thread 2 enter 
test thread 3 enter 
test thread 4 enter 
test thread 1 exit 
test thread 2 exit 
test thread 3 exit 
test thread 4 exit 
Problemi correlati