2010-01-28 13 views
25

Ho un processo che delega le attività asynch a un pool di thread. Devo assicurarmi che determinati compiti siano eseguiti in ordine. Così, per esempioControllo dell'ordine di esecuzione delle attività con ExecutorService

Compiti arrivano in ordine

Compiti A1, B1, C1, D1, E1, A2, A3, B2, f1

Le attività possono essere eseguite in qualsiasi ordine, tranne dove c'è una dipendenza naturale, quindi a1, a2, a3 devono essere elaborati in questo ordine allocando allo stesso thread o bloccandoli fino a quando non so che il precedente # è stato completato.

Attualmente non utilizza il pacchetto Java Concurrency, ma sto considerando di passare a prendere in considerazione la gestione dei thread.

Qualcuno ha una soluzione o suggerimenti di come raggiungere questo obiettivo

risposta

2

simile Quando si invia una Runnable o Callable a un ExecutorService si riceve un Future in cambio. Fai passare i thread che dipendono da a1 da Future a1 e chiama Future.get(). Questo bloccherà fino al completamento del thread.

Quindi:

ExecutorService exec = Executor.newFixedThreadPool(5); 
Runnable a1 = ... 
final Future f1 = exec.submit(a1); 
Runnable a2 = new Runnable() { 
    @Override 
    public void run() { 
    f1.get(); 
    ... // do stuff 
    } 
} 
exec.submit(a2); 

e così via.

+4

Non penso che questo funzionerà con un pool di thread fisso, come i fili potrebbero tutti blocco 'f1.get()' in una sola volta ed essere un punto morto. – finnw

+0

Accordare le dimensioni del pool in modo appropriato. – cletus

+0

Oppure utilizzare un pool di thread memorizzato nella cache. – finnw

2

Un'altra opzione è creare il proprio esecutore, chiamarlo OrderedExecutor e creare una matrice di oggetti ThreadPoolExecutor incapsulati, con 1 thread per esecutore interno. È quindi fornire un meccanismo per la scelta di uno degli oggetti interni, ad esempio, si può fare questo, fornendo un'interfaccia che l'utente della classe può implementare:

 
executor = new OrderedExecutor(10 /* pool size */, new OrderedExecutor.Chooser() { 
    public int choose(Runnable runnable) { 
    MyRunnable myRunnable = (MyRunnable)runnable; 
    return myRunnable.someId(); 
    }); 

executor.execute(new MyRunnable()); 

L'implementazione di OrderedExecutor.execute() sarà quindi utilizzare Scelta Risorse per ottenere un int, mod questo con la dimensione del pool e questo è il tuo indice nell'array interno. L'idea è che "someId()" restituirà lo stesso valore per tutti i "a", ecc.

12

Quando ho fatto questo in passato ho avuto di solito l'ordine gestito da un componente che quindi invia callables/runnables a un Executor.

Qualcosa come.

  • Hai una lista di attività da eseguire, alcuni con le dipendenze
  • creare un esecutore e avvolgere con un'ExecutorCompletionService
  • Cerca tutti i compiti, qualsiasi senza dipendenze, li pianificare tramite il servizio di completamento
  • Sondaggio il servizio di completamento
  • Come ogni attività viene completata
    • aggiungerlo a un elenco "completato"
    • Rivalutare eventuali attività di attesa nella "lista completa" per vedere se sono "dipendenti completati".Se è così pianificarne
    • Sciacquare ripetere fino a quando tutte le attività sono presentate/completato

Il servizio di completamento è un bel modo di essere in grado di ottenere i compiti in quanto completa, piuttosto che cercare di prelevare un po 'di Futures. Tuttavia, probabilmente vorrai mantenere un Map<Future, TaskIdentifier> che viene popolato quando un'attività è pianificata tramite il servizio di completamento in modo che quando il servizio di completamento ti offre un futuro completo, puoi capire quale sia lo .

Se ti trovi in ​​uno stato in cui le attività sono ancora in attesa di essere eseguite, ma non è in esecuzione nulla e non è possibile pianificare nulla, allora hai un problema di dipendenza circolare.

9

Scrivo proprio Esecutore che garantisce l'ordine delle attività per le attività con la stessa chiave. Utilizza la mappa delle code per le attività di ordine con la stessa chiave. Ogni attività con chiave esegue l'attività successiva con la stessa chiave.

Questa soluzione non gestisce RejectedExecutionException o altre eccezioni dall'Esecutore delegato! Quindi l'esecutore delegato dovrebbe essere "illimitato".

import java.util.HashMap; 
import java.util.LinkedList; 
import java.util.Map; 
import java.util.Queue; 
import java.util.concurrent.Executor; 

/** 
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly). 
*/ 
public class OrderingExecutor implements Executor{ 

    private final Executor delegate; 
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>(); 

    public OrderingExecutor(Executor delegate){ 
     this.delegate = delegate; 
    } 

    @Override 
    public void execute(Runnable task) { 
     // task without key can be executed immediately 
     delegate.execute(task); 
    } 

    public void execute(Runnable task, Object key) { 
     if (key == null){ // if key is null, execute without ordering 
      execute(task); 
      return; 
     } 

     boolean first; 
     Runnable wrappedTask; 
     synchronized (keyedTasks){ 
      Queue<Runnable> dependencyQueue = keyedTasks.get(key); 
      first = (dependencyQueue == null); 
      if (dependencyQueue == null){ 
       dependencyQueue = new LinkedList<Runnable>(); 
       keyedTasks.put(key, dependencyQueue); 
      } 

      wrappedTask = wrap(task, dependencyQueue, key); 
      if (!first) 
       dependencyQueue.add(wrappedTask); 
     } 

     // execute method can block, call it outside synchronize block 
     if (first) 
      delegate.execute(wrappedTask); 

    } 

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
     return new OrderedTask(task, dependencyQueue, key); 
    } 

    class OrderedTask implements Runnable{ 

     private final Queue<Runnable> dependencyQueue; 
     private final Runnable task; 
     private final Object key; 

     public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
      this.task = task; 
      this.dependencyQueue = dependencyQueue; 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      try{ 
       task.run(); 
      } finally { 
       Runnable nextTask = null; 
       synchronized (keyedTasks){ 
        if (dependencyQueue.isEmpty()){ 
         keyedTasks.remove(key); 
        }else{ 
         nextTask = dependencyQueue.poll(); 
        } 
       } 
       if (nextTask!=null) 
        delegate.execute(nextTask); 
      } 
     } 
    } 
} 
+0

+1. Grazie per quello.Userò questo impianto, ma davvero non so come questo non è contrassegnato come la risposta finale per la domanda. –

0

In Habanero-Java library, c'è un concetto di operazioni basate sui dati che possono essere usate per esprimere dipendenze tra compiti, evitando operazioni di filettatura bloccante. Sotto le copertine, la libreria Habanero-Java utilizza JDKs ForkJoinPool (cioè un ExecutorService).

Ad esempio, il caso d'uso per le attività di A1, A2, A3, ... potrebbe essere espresso come segue:

HjFuture a1 = future(() -> { doA1(); return true; }); 
HjFuture a2 = futureAwait(a1,() -> { doA2(); return true; }); 
HjFuture a3 = futureAwait(a2,() -> { doA3(); return true; }); 

noti che a1, a2, a3 e sono solo riferimenti a oggetti di tipo HjFuture e può essere mantenuto nelle strutture dati personalizzate per specificare le dipendenze come e quando le attività A2 e A3 entrano in fase di esecuzione.

Ci sono alcuni tutorial slides available. È possibile trovare ulteriore documentazione come javadoc, API summary e primers.

0

È possibile utilizzare Executors.newSingleThreadExecutor(), ma utilizzerà solo un thread per eseguire le attività. Un'altra opzione è usare CountDownLatch. Ecco un semplice esempio:

public class Main2 { 

public static void main(String[] args) throws InterruptedException { 

    final CountDownLatch cdl1 = new CountDownLatch(1); 
    final CountDownLatch cdl2 = new CountDownLatch(1); 
    final CountDownLatch cdl3 = new CountDownLatch(1); 

    List<Runnable> list = new ArrayList<Runnable>(); 
    list.add(new Runnable() { 
     public void run() { 
      System.out.println("Task 1"); 

      // inform that task 1 is finished 
      cdl1.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 1 is finished 
      try { 
       cdl1.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 2"); 

      // inform that task 2 is finished 
      cdl2.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 2 is finished 
      try { 
       cdl2.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 3"); 

      // inform that task 3 is finished 
      cdl3.countDown(); 
     } 
    }); 

    ExecutorService es = Executors.newFixedThreadPool(200); 
    for (int i = 0; i < 3; i++) { 
     es.submit(list.get(i)); 
    } 

    es.shutdown(); 
    es.awaitTermination(1, TimeUnit.MINUTES); 
} 
} 
0

Ho creato un OrderingExecutor per questo problema. Se si passa la stessa chiave al metodo execute() con runnables diversi, l'esecuzione dei runnables con la stessa chiave sarà nell'ordine in cui viene chiamato execute() e non si sovrapporrà mai.

import java.util.Arrays; 
import java.util.Collection; 
import java.util.Iterator; 
import java.util.Queue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.Executor; 

/** 
* Special executor which can order the tasks if a common key is given. 
* Runnables submitted with non-null key will guaranteed to run in order for the same key. 
* 
*/ 
public class OrderedExecutor { 

    private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
      new ConcurrentLinkedQueue<Runnable>()); 

    private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>(); 
    private Executor delegate; 
    private volatile boolean stopped; 

    public OrderedExecutor(Executor delegate) { 
     this.delegate = delegate; 
    } 

    public void execute(Runnable runnable, Object key) { 
     if (stopped) { 
      return; 
     } 

     if (key == null) { 
      delegate.execute(runnable); 
      return; 
     } 

     Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> { 
      v.add(runnable); 
      return v; 
     }); 
     if (queueForKey == null) { 
      // There was no running task with this key 
      Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>()); 
      newQ.add(runnable); 
      // Use putIfAbsent because this execute() method can be called concurrently as well 
      queueForKey = taskMap.putIfAbsent(key, newQ); 
      if (queueForKey != null) 
       queueForKey.add(runnable); 
      delegate.execute(new InternalRunnable(key)); 
     } 
    } 

    public void shutdown() { 
     stopped = true; 
     taskMap.clear(); 
    } 

    /** 
    * Own Runnable used by OrderedExecutor. 
    * The runnable is associated with a specific key - the Queue&lt;Runnable> for this 
    * key is polled. 
    * If the queue is empty, it tries to remove the queue from taskMap. 
    * 
    */ 
    private class InternalRunnable implements Runnable { 

     private Object key; 

     public InternalRunnable(Object key) { 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      while (true) { 
       // There must be at least one task now 
       Runnable r = taskMap.get(key).poll(); 
       while (r != null) { 
        r.run(); 
        r = taskMap.get(key).poll(); 
       } 
       // The queue emptied 
       // Remove from the map if and only if the queue is really empty 
       boolean removed = taskMap.remove(key, EMPTY_QUEUE); 
       if (removed) { 
        // The queue has been removed from the map, 
        // if a new task arrives with the same key, a new InternalRunnable 
        // will be created 
        break; 
       } // If the queue has not been removed from the map it means that someone put a task into it 
        // so we can safely continue the loop 
      } 
     } 
    } 

    /** 
    * Special Queue implementation, with equals() and hashCode() methods. 
    * By default, Java SE queues use identity equals() and default hashCode() methods. 
    * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()). 
    * 
    * @param <E> The type of elements in the queue. 
    */ 
    private static class QueueWithHashCodeAndEquals<E> implements Queue<E> { 

     private Queue<E> delegate; 

     public QueueWithHashCodeAndEquals(Queue<E> delegate) { 
      this.delegate = delegate; 
     } 

     public boolean add(E e) { 
      return delegate.add(e); 
     } 

     public boolean offer(E e) { 
      return delegate.offer(e); 
     } 

     public int size() { 
      return delegate.size(); 
     } 

     public boolean isEmpty() { 
      return delegate.isEmpty(); 
     } 

     public boolean contains(Object o) { 
      return delegate.contains(o); 
     } 

     public E remove() { 
      return delegate.remove(); 
     } 

     public E poll() { 
      return delegate.poll(); 
     } 

     public E element() { 
      return delegate.element(); 
     } 

     public Iterator<E> iterator() { 
      return delegate.iterator(); 
     } 

     public E peek() { 
      return delegate.peek(); 
     } 

     public Object[] toArray() { 
      return delegate.toArray(); 
     } 

     public <T> T[] toArray(T[] a) { 
      return delegate.toArray(a); 
     } 

     public boolean remove(Object o) { 
      return delegate.remove(o); 
     } 

     public boolean containsAll(Collection<?> c) { 
      return delegate.containsAll(c); 
     } 

     public boolean addAll(Collection<? extends E> c) { 
      return delegate.addAll(c); 
     } 

     public boolean removeAll(Collection<?> c) { 
      return delegate.removeAll(c); 
     } 

     public boolean retainAll(Collection<?> c) { 
      return delegate.retainAll(c); 
     } 

     public void clear() { 
      delegate.clear(); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if (!(obj instanceof QueueWithHashCodeAndEquals)) { 
       return false; 
      } 
      QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj; 
      return Arrays.equals(toArray(), other.toArray()); 
     } 

     @Override 
     public int hashCode() { 
      return Arrays.hashCode(toArray()); 
     } 

    } 

} 
Problemi correlati