2010-06-17 10 views
9

Sto lavorando su alcune applicazioni e utilizzo di ThreadPoolExecutor per la gestione di varie attività. ThreadPoolExecutor si blocca dopo una certa durata. Per simularlo in un ambiente più semplice, ho scritto un semplice codice in cui sono in grado di simulare il problema.Java ThreadPoolExecutor si blocca durante l'utilizzo di ArrayBlockingQueue

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.RejectedExecutionHandler; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class MyThreadPoolExecutor { 
    private int poolSize = 10; 
    private int maxPoolSize = 50; 
    private long keepAliveTime = 10; 
    private ThreadPoolExecutor threadPool = null; 
    private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
      100000); 

    public MyThreadPoolExecutor() { 
     threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, 
       keepAliveTime, TimeUnit.SECONDS, queue); 
     threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 

      @Override 
      public void rejectedExecution(Runnable runnable, 
        ThreadPoolExecutor threadPoolExecutor) { 
       System.out 
         .println("Execution rejected. Please try restarting the application."); 
      } 

     }); 
    } 

    public void runTask(Runnable task) { 
     threadPool.execute(task); 
    } 

    public void shutDown() { 
     threadPool.shutdownNow(); 
    } 
    public ThreadPoolExecutor getThreadPool() { 
     return threadPool; 
    } 

    public void setThreadPool(ThreadPoolExecutor threadPool) { 
     this.threadPool = threadPool; 
    } 

    public static void main(String[] args) { 
     MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor(); 
     for (int i = 0; i < 1000; i++) { 
      final int j = i; 
      mtpe.runTask(new Runnable() { 

       @Override 
       public void run() { 
        System.out.println(j); 
       } 

      }); 
     } 
    } 
} 

Provare a eseguire questo codice alcune volte. Normalmente stampa il numero sulla console e quando tutti i thread terminano, esiste. Ma a volte, ha terminato tutte le attività e quindi non viene terminato. La discarica filo è come segue:

MyThreadPoolExecutor [Java Application] 
    MyThreadPoolExecutor at localhost:2619 (Suspended) 
     Daemon System Thread [Attach Listener] (Suspended) 
     Daemon System Thread [Signal Dispatcher] (Suspended)  
     Daemon System Thread [Finalizer] (Suspended)  
      Object.wait(long) line: not available [native method] 
      ReferenceQueue<T>.remove(long) line: not available  
      ReferenceQueue<T>.remove() line: not available  
      Finalizer$FinalizerThread.run() line: not available 
     Daemon System Thread [Reference Handler] (Suspended)  
      Object.wait(long) line: not available [native method] 
      Reference$Lock(Object).wait() line: 485 
      Reference$ReferenceHandler.run() line: not available  
     Thread [pool-1-thread-1] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-2] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-3] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-4] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-6] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-8] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-5] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-10] (Suspended) 
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-9] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-7] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [DestroyJavaVM] (Suspended) 
    C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM) 

Nella mia domanda effettiva, fili ThreadPoolExecutor andare in questo stato e quindi si blocca.

saluti, Ravi Rao

+0

Un'idea è che si prova ExecutorService, uno dei miei preferiti. –

+0

@Lars Andren, Un ThreadPoolExecutor è un ExecutorService. ExecutorService è semplicemente un'interfaccia. Nella libreria Java 1.5, ThreadPoolExecutor è l'unica implementazione diretta dell'interfaccia ExecutorService. C'è un AbstractExecutorService e un DelegatedExecutorService che non sono classi funzionali stand-alone. Inoltre, esiste un'interfaccia SheceduledExecutorService che estende ExecutorService e ha una singola implementazione concreta. –

risposta

9

nel metodo main, non hai mai chiamano mtpe.shutdown(). Un ThreadPoolExecutor tenterà di mantenere il suo corePoolSize attivo a tempo indeterminato. A volte, si è fortunati e si hanno più thread attivi di corePoolSize, quindi ogni thread di lavoro entrerà in un ramo logico condizionale che gli consentirà di terminare dopo il periodo di timeout specificato di 10 secondi. Tuttavia, come hai notato, a volte questo non è il caso, quindi ogni thread nell'esecutore si bloccherà su ArrayBlockingQueue.take() e attenderà una nuova attività.

Inoltre, si noti, vi è una differenza significativa tra ExecutorService.shutdown() e ExecutorService.shutdownNow(). Se si richiama ExecutorService.shutdownNow() come indicato dall'implementazione del wrapper, a volte verranno eliminate alcune attività che non sono state assegnate per l'esecuzione.

Aggiornamento: Dalla mia risposta originale, l'implementazione ThreadPoolExecutor è stata modificata in modo tale che il programma nel post originale non dovrebbe mai uscire.

Problemi correlati