2015-12-10 30 views
7

Ho un processo che deve calcolare molte piccole attività in parallelo e quindi elaborare i risultati nell'ordine naturale delle attività. Per fare questo, ho la seguente configurazione:Blocco thread thread fisso, quando sono state inviate abbastanza attività

Un semplice ExecutorService, e una coda di blocco userò per tenere gli oggetti Future restituiti quando un richiamabile viene presentata al esecutore:

ExecutorService exec = Executors.newFixedThreadPool(15); 
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64); 

Alcuni codice di debug per contare il numero di presentate e numero di compiti elaborati e scriverli su base regolare (si noti che processed viene incrementato al termine del codice compito stesso):

AtomicLong processed = new AtomicLong(0); 
AtomicLong submitted = new AtomicLong(0); 

Timer statusTimer = new Timer(); 
statusTimer.schedule(new TimerTask() { 
     @Override 
     public void run() { 
     l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get()))); 
     }    
}, 60 * 1000, 60 * 1000); 

un filo che prende compiti da una coda (che è tutto ya generatore) e li sottopone l'esecutore, mettendo il futuro con conseguente coda futures (questo è come mi assicuro che non invio troppi compiti di esaurire la memoria):

Thread submitThread = new Thread(() -> 
{ 
    MyTask task; 
    try { 
     while ((task = taskQueue.poll()) != null) { 
      futures.put(exec.submit(task)); 
      submitted.incrementAndGet(); 
     } 
    } catch (Exception e) {l .error("Unexpected Exception", e);} 
}, "SubmitTasks"); 
submitThread.start(); 

Il thread corrente allora take -s compiti finiti dalla coda futures e gestisce il risultato:

while (!futures.isEmpty() || submitThread.isAlive()) { 
    MyTask task = futures.take().get(); 
    //process result 
} 

Quando ho eseguito questo su un server con 8 core (si noti che il codice utilizza attualmente 15 thread) i picchi di utilizzo della CPU a circa il 60% solo . Vedo il mio output di debug in questo modo:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947 
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889 
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853 
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871 
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818 
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950 

Un dump filo mostra che molti dei fili pool di thread stanno bloccando in questo modo:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for <0x00007f2fbb0001b0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) 
     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222) 
     at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) 
     at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439) 
     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

La mia interpretazione dell'output di debug è che in un qualsiasi punto nel tempo, ho almeno alcune centinaia di attività che sono state inviate al servizio executor, ma non sono state elaborate (posso anche confermare nello stack trace che il thread SubmitTasks è bloccato su LinkedBlockingQueue.put). Tuttavia, lo stack trace (e le statistiche di utilizzo del server) mi mostrano che il servizio Executor è bloccato su LinkedBlockingQueue.take (suppongo che la coda di attività interna sia vuota).

Cosa sto leggendo male?

risposta

0

I thread che interessano BlockingQueue s sono sempre difficili. Basta guardare il tuo codice senza mai correre con la scala che fai. Ho alcuni suggerimenti. È il consiglio di molti esperti del settore come Jessica Kerr che non dovresti mai bloccare per sempre. Quello che puoi fare è usare metodi con timeout in LinkedBlockingQueue.

Thread submitThread = new Thread(() -> 
{ 
    MyTask task; 
    try { 
     while ((task = taskQueue.peek()) != null) { 
      boolean success = futures.offer(exec.submit(task), 1000, TimeUnit.MILLISECONDS); 
      if(success) { 
       submitted.incrementAndGet(); 
       taskQueue.remove(task); 
      } 
     } 
    } catch (Exception e) {l .error("Unexpected Exception", e);} 
}, "SubmitTasks"); 
submitThread.start(); 

E anche qui.

while (!futures.isEmpty() || submitThread.isAlive()) { 
    Future<MyTask> f = futures.poll(1000, TimeUnit.MILLISECONDS); 
    if(f != null) { 
     MyTask task = f.get(); 
    } 
    //process result 
} 

Guarda questo video di Jessica Kerr su Concurrency tools in JVM

Problemi correlati