2012-04-03 21 views
5

Desidero utilizzare un CompletionService per elaborare i risultati da una serie di thread quando sono stati completati. Ho il servizio in un ciclo per prendere gli oggetti Future che fornisce man mano che diventano disponibili, ma non conosco il modo migliore per determinare quando tutti i thread hanno completato (e quindi per uscire dal ciclo):Come sapere quando un CompletionService ha finito di fornire risultati?

import java.util.concurrent.Callable; 
import java.util.concurrent.CompletionService; 
import java.util.concurrent.ExecutorCompletionService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.ThreadPoolExecutor; 

public class Bar { 

    final static int MAX_THREADS = 4; 
    final static int TOTAL_THREADS = 20; 

    public static void main(String[] args) throws Exception{ 

     final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS); 
     final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool); 

     for (int i=0; i<TOTAL_THREADS; i++){ 
      service.submit(new MyCallable(i)); 
     } 

     int finished = 0; 
     Future<Integer> future = null; 
     do{ 
      future = service.take(); 
      int result = future.get(); 
      System.out.println(" took: " + result); 
      finished++;    

     }while(finished < TOTAL_THREADS); 

     System.out.println("Shutting down"); 
     threadPool.shutdown(); 
    } 


    public static class MyCallable implements Callable<Integer>{ 

     final int id; 

     public MyCallable(int id){ 
      this.id = id; 
      System.out.println("Submitting: " + id); 
     } 

     @Override 
     public Integer call() throws Exception { 
      Thread.sleep(1000); 
      System.out.println("finished: " + id); 
      return id; 
     } 
    } 
} 

Ho provato a verificare lo stato di ThreadPoolExecutor, ma so che i metodi getCompletedTaskCount e getTaskCount sono solo approssimazioni e non dovrebbero essere considerati affidabili. C'è un modo migliore per assicurarmi di aver recuperato tutti i Futures dal CompletionService di contarli da soli?


Edit: Sia il link che Nobeh fornito, e this link suggeriscono che contando il numero di attività presentate, quindi chiamando prendere() che molte volte, è la strada da percorrere. Sono solo sorpreso che non ci sia un modo per chiedere al CompletionService o al suo Executor cosa rimane da restituire.

risposta

3

Rispondere a queste domande ti dà la risposta?

  • Le attività asincrone creano altre attività inviate a CompletionService?
  • È service l'unico oggetto che deve gestire le attività create nell'applicazione?

Sulla base reference documentation, CompletionService agisce su un/approccio produttore consumatore e sfrutta una interna Executor. Quindi, finché produci i compiti in un posto e li consumi in un altro posto, il numero di telefono CompletionService.take() indicherà se ci sono altri risultati da distribuire.

Credo che anche this question ti sia d'aiuto.

+0

Grazie, nobile. Sembra che anche loro si colleghino semplicemente al conteggio dei thread, nel loro ciclo "for (int tasksHandled = 0; tasksHandled

+1

L'esempio nell'API utilizza lo stesso approccio dell'esecuzione take() n volte di seguito. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorCompletionService.html –

+0

Se i risultati di CompletionService in un thread diverso rispetto a quelli inviati a Exector, il thread è sicuro? – raffian

3

Vedere http://www.javaspecialists.eu/archive/Issue214.html per un suggerimento decente su come estendere ExecutorCompletionService per fare ciò che si sta cercando. Ho incollato il codice pertinente qui sotto per la vostra convenienza. L'autore suggerisce anche di rendere il servizio implementabile Iterable, che credo sarebbe una buona idea.

FWIW, sono d'accordo con te sul fatto che questo dovrebbe essere parte dell'implementazione standard, ma purtroppo non lo è.

import java.util.concurrent.*; 
import java.util.concurrent.atomic.*; 

public class CountingCompletionService<V> extends ExecutorCompletionService<V> { 
    private final AtomicLong submittedTasks = new AtomicLong(); 
    private final AtomicLong completedTasks = new AtomicLong(); 

    public CountingCompletionService(Executor executor) { 
    super(executor); 
    } 

    public CountingCompletionService(
     Executor executor, BlockingQueue<Future<V>> queue) { 
    super(executor, queue); 
    } 

    public Future<V> submit(Callable<V> task) { 
    Future<V> future = super.submit(task); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> submit(Runnable task, V result) { 
    Future<V> future = super.submit(task, result); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> take() throws InterruptedException { 
    Future<V> future = super.take(); 
    completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll() { 
    Future<V> future = super.poll(); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll(long timeout, TimeUnit unit) 
     throws InterruptedException { 
    Future<V> future = super.poll(timeout, unit); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public long getNumberOfCompletedTasks() { 
    return completedTasks.get(); 
    } 

    public long getNumberOfSubmittedTasks() { 
    return submittedTasks.get(); 
    } 

    public boolean hasUncompletedTasks() { 
    return completedTasks.get() < submittedTasks.get(); 
    } 
} 
Problemi correlati