2015-03-10 13 views
5

Sto cercando una buona soluzione per coordinare diverse attività di multithreading.Concorrenza Java: Coordinamento di più attività e annullamento

Fondamentalmente Ho 2 compiti, chiamo A e B che devono essere eseguite su un thread diverso dal thread principale.

Ma il B deve essere avviato dopo che A è stato completato. A e B possono contenere più parti che devono essere parallele, chiamate A1, A2, ... B1, B2, ....

E c'è un chiamante dall'esterno, che ha bisogno di riavviare l'intero lavoro indipendentemente dal progresso. Come posso ottenerlo? Ho pensato di creare una sorta di array booleano contenente le informazioni se ogni attività secondaria (A1, ...) ha già completato e in tal caso avviare B. E controllare ogni poche righe di codice in ciascun metodo se è già stata effettuata una cancellazione. Ma a me sembra che quella non sia una soluzione elegante e che ci siano modi per coordinare eccome questo.

enter image description here

+3

Volete ExecutorServices. E tu vuoi aspettare Futures. Tutto ciò è disponibile nelle pagine della documentazione Java di Oracle. – Fildor

+0

I tuoi sottoattività (A1, A2 ...) restituiscono qualche risultato? Se sì, è utile 'java.util.concurrent.FutureTask' per gestire tale processo: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/FutureTask.html. Se no, usa 'CountDownLatch' dallo stesso pacchetto. –

+0

http://stackoverflow.com/questions/1361029/waiting-on-multiple-threads-to-complet-in-java – StanislavL

risposta

0

seguito è riportato un esempio di implementazione utilizzando countDownLatches e Exectors:

public class Test { 
    static ExecutorService maintaskExecutor = Executors.newFixedThreadPool(2); 

    private static CountDownLatch latch = new CountDownLatch(0); 


    public Test() { 
    } 


    public static void main(String[] args) { 
     maintaskExecutor.submit(new runnableA()); 
     maintaskExecutor.submit(new runnableB()); 

    } 

    private void restart() { 
     maintaskExecutor.shutdownNow(); 
     maintaskExecutor.submit(new runnableA()); 
     maintaskExecutor.submit(new runnableB()); 
    } 


    private static class runnableA implements Runnable { 
     ExecutorService taskExecutorA = Executors.newFixedThreadPool(3); 
     private final CountDownLatch latchA = new CountDownLatch(3); 


     @Override 
     public void run() { 

      try { 
       Runnable a1Runnable = createA1Runnable(); 
       Runnable a2Runnable = createA1Runnable(); 
       Runnable a3Runnable = createA1Runnable(); 

       taskExecutorA.submit(a1Runnable); 
       taskExecutorA.submit(a2Runnable); 
       taskExecutorA.submit(a3Runnable); 

       latchA.await(); 
       latch.countDown(); 
      } catch (InterruptedException e) { 
       taskExecutorA.shutdownNow(); 
      } 
     } 

     private Runnable createA1Runnable() { 
      return new Runnable() { 
       @Override 
       public void run() { 
        //Design this task to respond to interruption by checking if the thread has been interrupted 
        while(!Thread.interrupted()){ 
         //Do the work 
        } 

        return; 
       } 
      }; 
     } 
    } 

     private static class runnableB implements Runnable{ 
       private final CountDownLatch latch = new CountDownLatch(3); 
       ExecutorService taskExecutorB = Executors.newFixedThreadPool(3); 

       public void run(){ 
        try { 
        latch.await(); 
        //Creates the tasks B1, B2, ... 


       } catch (InterruptedException e) { 
        taskExecutorB.shutdownNow(); 
       } 
      } 
} 

} 
2

In Java8, è possibile utilizzare CompletableFutures. Il metodo execA imposta tre attività parallele e restituisce un CompletableFuture che comprende tutte queste attività. execB attende il completamento di questa attività composita e quindi imposta una serie di attività a sé stante. Infine, lo get nel metodo principale attende il completamento dei metodi B.

public class Futures { 
    String name; 
    int value; 

    public static void main(String[] args) { 
     try { 
     execB(execA()).get(); 
     } catch(InterruptedException|ExecutionException e) {} 
    } 
    Futures(String name, int value) { 
     this.name = name; 
     this.value = value; 
    } 

    void runMethod() { 
     System.out.println("Entering " + name); 
     try { 
      Thread.sleep(value * 1000); 
     } catch(InterruptedException e) {} 
     System.out.println("Exiting " + name); 
    } 
    public static CompletableFuture<Void> execA() { 
     return(
      CompletableFuture.<Void>allOf(
      CompletableFuture.runAsync(() -> (new Futures("a1", 4)).runMethod()), 
      CompletableFuture.runAsync(() -> (new Futures("a2", 2)).runMethod()), 
      CompletableFuture.runAsync(() -> (new Futures("a3", 1)).runMethod())) 
     ); 
    } 
    public static CompletableFuture<Void> execB(CompletableFuture<Void> prev) { 
     try { 
      prev.get(); 
     } catch (InterruptedException|ExecutionException e) {} 
     return(
      CompletableFuture.<Void>allOf(
      CompletableFuture.runAsync(() -> (new Futures("b1", 2)).runMethod()), 
      CompletableFuture.runAsync(() -> (new Futures("b2", 3)).runMethod()), 
      CompletableFuture.runAsync(() -> (new Futures("b3", 1)).runMethod()))); 
    } 
} 
0

Se c'è bisogno di uscite dei compiti parziali, è possibile utilizzare thenCombine in CompletableFuture

CompletableFuture<String> a1 = CompletableFuture.supplyAsync(() -> "a1"); 
    CompletableFuture<String> a2 = CompletableFuture.supplyAsync(() -> "a2"); 
    CompletableFuture<String> a3 = CompletableFuture.supplyAsync(() -> "a3"); 
    CompletableFuture<String> a = a1.thenCombine(a2, (a1r, a2r) -> "combination of a1 and a2").thenCombine(a3, 
      (a1anda2r, a3r) -> "combination of a1,a2,a3"); 

    CompletableFuture<String> b1 = CompletableFuture.supplyAsync(() -> "a1"); 
    CompletableFuture<String> b2 = CompletableFuture.supplyAsync(() -> "a2"); 
    CompletableFuture<String> b3 = CompletableFuture.supplyAsync(() -> "a3"); 
    CompletableFuture<String> b = a.thenCombine(b1, (ar, b1r) -> "combination of a and b1") 
      .thenCombine(b2, (aAndb1, b2r) -> "combination of a,b1,b2") 
      .thenCombine(b3, (aAndb1Andb2, b3r) -> "combination of A and B"); 

Se non è necessaria l'uscita, è possibile utilizzare la soluzione allof da Neil Masson

Problemi correlati