2009-04-30 15 views
43

Sto implementando un meccanismo di pooling di thread in cui vorrei eseguire compiti con priorità diverse. Mi piacerebbe avere un buon meccanismo per cui posso inviare un compito ad alta priorità al servizio e averlo programmato prima di altre attività. La priorità dell'attività è una proprietà intrinseca del compito stesso (se esprimo quell'attività come Callable o Runnable non è importante per me).Come si implementa la priorità delle attività utilizzando un ExecutorService in Java 5?

Ora, superficialmente sembra che potrei usare un PriorityBlockingQueue come coda compito nel mio ThreadPoolExecutor, ma che coda contiene Runnable oggetti, che possono o non possono essere i compiti Runnable ho che le vengono sottoposte. Inoltre, se ho inviato attività Callable, non è chiaro come questo possa mai essere mappato.

C'è un modo per farlo? Preferirei davvero non tirare il mio per questo, dal momento che ho molte più probabilità di sbagliare in questo modo.

(An da parte; sì, sono consapevole della possibilità di morire di fame per i lavori di bassa priorità in qualcosa di simile Punti extra() per soluzioni che hanno una ragionevole garanzia di equità.?!)

+3

Interessante domanda. A mio parere, questo sembra un po 'di supervisione nell'API. –

+0

Se dovessi indovinare perché non fa parte dell'API, direi che probabilmente è perché il problema della fame è difficile. Avrebbero bisogno di fornire una nuova serie di primitivi per l'equità e l'escalation; cose come must-execute-by e may-be-indefinitely-deferred (nota che sto tirando fuori questi nomi dal mio culo). Potrei desiderare che l'abbiano fatto, ma non li biasimo :) –

+0

Sì, questo ha senso. Sembra che sarebbe una buona cosa avere, però, ma quando pensi di dover essenzialmente scrivere un algoritmo di programmazione della CPU in Java probabilmente stai facendo qualcosa di sbagliato. –

risposta

8

A prima vista sembrerebbe che si potrebbe definire un'interfaccia per le attività che si estende Runnable o Callable<T> e Comparable. Quindi impacchetta uno ThreadPoolExecutor con un PriorityBlockingQueue come coda e accetta solo le attività che implementano l'interfaccia.

Prendendo in considerazione il tuo commento, sembra che un'opzione sia quella di estendere ThreadPoolExecutor e sovrascrivere i metodi submit(). Fare riferimento a AbstractExecutorService per vedere come appaiono quelli predefiniti; tutto ciò che fanno è avvolgere il Runnable o Callable in un FutureTask e execute() esso. Lo farei probabilmente scrivendo una classe wrapper che implementa ExecutorService e delega a un interno anonimo ThreadPoolExecutor. Avvolgili in qualcosa che ha la tua priorità, in modo che il tuo Comparator possa ottenerlo.

+2

Anche quello era il mio, ma ecco il problema; le istanze 'Runnable' che vengono passate alla coda di priorità sono _non_ le attività che io' invio' direttamente, sono racchiuse in un 'java.util.concurrent.FutureTask ' che, naturalmente, non è ordinato allo stesso modo. Se uso 'execute' - che per esempio non accetta 'Callable' - quindi getta i miei oggetti in. –

+0

Hmm, questo complica le cose. Ho pensato che c'era qualcosa che mi mancava. –

+0

Dirò. Sto ancora battendo, ma ... Beh, basti dire che è un dolore :) –

0

Would è possibile avere uno ThreadPoolExecutor per ogni livello di priorità? A ThreadPoolExecutor può essere instanciated con ThreadFactory e si può avere la propria implementazione di un ThreadFactory per impostare i diversi livelli di priorità.

class MaxPriorityThreadFactory implements ThreadFactory { 
    public Thread newThread(Runnable r) { 
     Thread thread = new Thread(r); 
     thread.setPriority(Thread.MAX_PRIORITY); 
    } 
} 
+1

La priorità del filo non è molto importante per me qui; i compiti stessi tenderanno ad essere eseguiti in tempi ragionevolmente rapidi (l'obiettivo è di portarli a ~ 50ms ciascuno), quindi la programmazione dei thread è meno problematica. È la priorità dei compiti relativi l'uno all'altro che è in discussione qui. –

+0

Devono essere eseguiti in un certo ordine? – willcodejavaforfood

+0

Non c'è nessuno, vero ordine, no, ma le attività che arrivano più tardi ma sono di priorità più alta _should_ vengono eseguite prima delle attività che arrivano più tardi, ma hanno una priorità inferiore. Di nuovo, con una certa garanzia di equità per prevenire la fame. –

16

Ho risolto questo problema in modo ragionevole, e lo descriverò qui di seguito per riferimento futuro a me stesso e chiunque altro si imbattesse in questo problema con le librerie Java Concurrent.

L'utilizzo di un PriorityBlockingQueue come mezzo per trattenere le attività per l'esecuzione successiva è effettivamente un movimento nella direzione corretta. Il problema è che il PriorityBlockingQueue deve essere istanziato genericamente per contenere istanze Runnable ed è impossibile chiamare compareTo (o simile) su un'interfaccia Runnable.

Onto a risolvere il problema. Quando si crea l'Executor, deve essere assegnato a PriorityBlockingQueue.La coda dovrebbe inoltre essere dato un comparatore personalizzato per fare una corretta in luogo di smistamento:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator()); 

Ora, una sbirciatina al CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> { 

    @Override 
    public int compare(MyType first, MyType second) { 
     return comparison; 
    } 

} 

Tutto cercando piuttosto dritto in avanti fino a questo punto. Diventa un po 'appiccicoso qui. Il nostro prossimo problema è affrontare la creazione di FutureTasks dall'Esecutore. Nel esecutore, dobbiamo ignorare newTaskFor come così:

@Override 
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 
    //Override the default FutureTask creation and retrofit it with 
    //a custom task. This is done so that prioritization can be accomplished. 
    return new CustomFutureTask(c); 
} 

Dove c è il compito Callable che stiamo cercando di eseguire. Ora, diamo uno sguardo a CustomFutureTask:

public class CustomFutureTask extends FutureTask { 

    private CustomTask task; 

    public CustomFutureTask(Callable callable) { 
     super(callable); 
     this.task = (CustomTask) callable; 
    } 

    public CustomTask getTask() { 
     return task; 
    } 

} 

Avviso il metodo getTask. Lo useremo in seguito per estrapolare l'attività originale da questo CustomFutureTask che abbiamo creato.

E, infine, modifichiamo il compito originale che stavamo cercando di eseguire:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> { 

    private final MyType myType; 

    public CustomTask(MyType myType) { 
     this.myType = myType; 
    } 

    @Override 
    public MyType call() { 
     //Do some things, return something for FutureTask implementation of `call`. 
     return myType; 
    } 

    @Override 
    public int compareTo(MyType task2) { 
     return new CustomTaskComparator().compare(this.myType, task2.myType); 
    } 

} 

Si può vedere che implementiamo Comparable nel compito di delegare al reale Comparator per MyType.

E ce l'hai, prioritizzazione personalizzata per un Executor che utilizza le librerie Java! Ci vuole un po 'di flessione, ma è il più pulito che sono riuscito a trovare. Spero che questo sia utile a qualcuno!

+1

Esistono alcune limitazioni intrinseche a questo meccanismo. Ad esempio, il primo runnable/callable passato all'esecutore non va in coda. Pertanto, il meccanismo di priorità si applica solo quando le attività vengono accodate e ciò accade quando il numero di corridori correnti supera il numero di thread massimo nella dimensione del pool (qui 1). – Snicolas

+0

In CustomTask, non dovresti installare un oggetto per ogni confronto, questo rallenterà parecchio. – Snicolas

+0

dove si usa il getTask? –

4

È possibile utilizzare queste classi di supporto:

public class PriorityFuture<T> implements RunnableFuture<T> { 

    private RunnableFuture<T> src; 
    private int priority; 

    public PriorityFuture(RunnableFuture<T> other, int priority) { 
     this.src = other; 
     this.priority = priority; 
    } 

    public int getPriority() { 
     return priority; 
    } 

    public boolean cancel(boolean mayInterruptIfRunning) { 
     return src.cancel(mayInterruptIfRunning); 
    } 

    public boolean isCancelled() { 
     return src.isCancelled(); 
    } 

    public boolean isDone() { 
     return src.isDone(); 
    } 

    public T get() throws InterruptedException, ExecutionException { 
     return src.get(); 
    } 

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
     return src.get(timeout, unit); 
    } 

    public void run() { 
     src.run(); 
    } 

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() { 
     public int compare(Runnable o1, Runnable o2) { 
      if (o1 == null && o2 == null) 
       return 0; 
      else if (o1 == null) 
       return -1; 
      else if (o2 == null) 
       return 1; 
      else { 
       int p1 = ((PriorityFuture<?>) o1).getPriority(); 
       int p2 = ((PriorityFuture<?>) o2).getPriority(); 

       return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); 
      } 
     } 
    }; 
} 

E

public interface PriorityCallable<T> extends Callable<T> { 

    int getPriority(); 

} 

E questo metodo di supporto:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
      new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) { 

     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
      RunnableFuture<T> newTaskFor = super.newTaskFor(callable); 
      return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority()); 
     } 
    }; 
} 

E quindi utilizzare in questo modo:

class LenthyJob implements PriorityCallable<Long> { 
    private int priority; 

    public LenthyJob(int priority) { 
     this.priority = priority; 
    } 

    public Long call() throws Exception { 
     System.out.println("Executing: " + priority); 
     long num = 1000000; 
     for (int i = 0; i < 1000000; i++) { 
      num *= Math.random() * 1000; 
      num /= Math.random() * 1000; 
      if (num == 0) 
       num = 1000000; 
     } 
     return num; 
    } 

    public int getPriority() { 
     return priority; 
    } 
} 

public class TestPQ { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     ThreadPoolExecutor exec = getPriorityExecutor(2); 

     for (int i = 0; i < 20; i++) { 
      int priority = (int) (Math.random() * 100); 
      System.out.println("Scheduling: " + priority); 
      LenthyJob job = new LenthyJob(priority); 
      exec.submit(job); 
     } 
    } 
} 
+0

@Snicolas la modifica che hai apportato non viene compilata. – assylias

+0

@assyslias, quale versione JDK usi? – Snicolas

+0

Solo una parola per dire questa risposta è buona. – Snicolas

3

Cercherò di spiegare questo problema con un codice completamente funzionale. Ma prima di tuffarsi nel codice Vorrei spiegare circa PriorityBlockingQueue

PriorityBlockingQueue: PriorityBlockingQueue è un'implementazione del BlockingQueue. Accetta le attività insieme alla loro priorità e invia l'attività con la massima priorità per l'esecuzione. Se due attività hanno la stessa priorità, è necessario fornire alcune regole personalizzate per decidere quale attività deve essere eseguita per prima.

Ora consente di accedere al codice immediatamente.

Classe driver: questa classe crea un executor che accetta le attività e successivamente le invia per l'esecuzione. Qui creiamo due task uno con priorità LOW e l'altro con priorità HIGH. Qui diciamo all'esecutore di eseguire un MAX di 1 thread e utilizzare PriorityBlockingQueue.

 public static void main(String[] args) { 

     /* 
     Minimum number of threads that must be running : 0 
     Maximium number of threads that can be created : 1 
     If a thread is idle, then the minimum time to keep it alive : 1000 
     Which queue to use : PriorityBlockingQueue 
     */ 
    PriorityBlockingQueue queue = new PriorityBlockingQueue(); 
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 
     1000, TimeUnit.MILLISECONDS,queue); 


    MyTask task = new MyTask(Priority.LOW,"Low"); 
    executor.execute(new MyFutureTask(task)); 
    task = new MyTask(Priority.HIGH,"High"); 
    executor.execute(new MyFutureTask(task)); 
    task = new MyTask(Priority.MEDIUM,"Medium"); 
    executor.execute(new MyFutureTask(task)); 

} 

classe MyTask: MyTask implementa Runnable e accetta come argomento prioritario nel costruttore. Quando viene eseguita questa attività, stampa un messaggio e quindi mette il thread in stop per 1 secondo.

public class MyTask implements Runnable { 

    public int getPriority() { 
    return priority.getValue(); 
    } 

    private Priority priority; 

    public String getName() { 
    return name; 
    } 

    private String name; 

    public MyTask(Priority priority,String name){ 
    this.priority = priority; 
    this.name = name; 
    } 

    @Override 
    public void run() { 
    System.out.println("The following Runnable is getting executed "+getName()); 
    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

} 

classe MyFutureTask: Dal momento che stiamo usando PriorityBlocingQueue per tenere i nostri compiti, i nostri compiti deve essere avvolto all'interno FutureTask e la nostra implementazione di FutureTask deve implementare l'interfaccia Comparable. L'interfaccia Comparable confronta la priorità di 2 diverse attività e invia l'attività con la massima priorità per l'esecuzione.

public class MyFutureTask extends FutureTask<MyFutureTask> 
     implements Comparable<MyFutureTask> { 

    private MyTask task = null; 

    public MyFutureTask(MyTask task){ 
     super(task,null); 
     this.task = task; 
    } 

    @Override 
    public int compareTo(MyFutureTask another) { 
     return task.getPriority() - another.task.getPriority(); 
    } 
    } 

Classe di priorità: Si spiega da sé Classe di priorità.

public enum Priority { 

    HIGHEST(0), 
    HIGH(1), 
    MEDIUM(2), 
    LOW(3), 
    LOWEST(4); 

    int value; 

    Priority(int val) { 
    this.value = val; 
    } 

    public int getValue(){ 
    return value; 
    } 


} 

Ora, quando si corre questo esempio, si ottiene il seguente output

The following Runnable is getting executed High 
The following Runnable is getting executed Medium 
The following Runnable is getting executed Low 

Anche se abbiamo presentato la bassa priorità prima, ma ad alto compito prioritario tardi, ma dal momento che stiamo utilizzando un PriorityBlockingQueue, qualsiasi l'attività con una priorità più alta verrà eseguita per prima.

+0

Crea attività High1, High2, Low1, Low2, Low3 e la loro esecuzione viene mischiata all'interno del livello di priorità. Soluzione necessaria per preservare l'ordine di invio per le stesse attività prioritarie Aggiornamento –

1

La mia soluzione conserva l'ordine di sottomissione di attività per le stesse priorità. Si tratta di un miglioramento di questo answer

Task ordine di esecuzione si basa su:

  1. Priorità
  2. Submit fine (entro stessa priorità)

Tester classe:

public class Main { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 

     ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1); 

     //Priority=0 
     executorService.submit(newCallable("A1", 200));  //Defaults to priority=0 
     executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0 
     executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0)); 
     executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0)); 
     executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0)); 
     executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0)); 
     executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0)); 
     executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0)); 

     //Priority=1 
     executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1)); 
     executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1)); 
     executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1)); 
     executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1)); 
     executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1)); 

     executorService.shutdown(); 

    } 

    private static Runnable newRunnable(String name, int delay) { 
     return new Runnable() { 
      @Override 
      public void run() { 
       System.out.println(name); 
       sleep(delay); 
      } 
     }; 
    } 

    private static Callable<Integer> newCallable(String name, int delay) { 
     return new Callable<Integer>() { 
      @Override 
      public Integer call() throws Exception { 
       System.out.println(name); 
       sleep(delay); 
       return 10; 
      } 
     }; 
    } 

    private static void sleep(long millis) { 
     try { 
      Thread.sleep(millis); 
     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
      throw new RuntimeException(e); 
     } 
    } 

} 

Risultato:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

primo compito è A1 perché non c'erano priorità più alta nella coda quando è stato inserito. task B sono 1 priorità così eseguito in precedenza, compiti A sono 0 priorità così eseguito successivamente, ma l'ordine di esecuzione è seguente ordine sottomissione: B1, B2, B3, ... A2, A3, A4 ...

La soluzione:

public class PriorityExecutors { 

    public static ExecutorService newFixedThreadPool(int nThreads) { 
     return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS); 
    } 

    private static class PriorityExecutor extends ThreadPoolExecutor { 
     private static final int DEFAULT_PRIORITY = 0; 
     private static AtomicLong instanceCounter = new AtomicLong(); 

     @SuppressWarnings({"unchecked"}) 
     public PriorityExecutor(int corePoolSize, int maximumPoolSize, 
       long keepAliveTime, TimeUnit unit) { 
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, 
        ComparableTask.comparatorByPriorityAndSequentialOrder())); 
     } 

     @Override 
     public void execute(Runnable command) { 
      // If this is ugly then delegator pattern needed 
      if (command instanceof ComparableTask) //Already wrapped 
       super.execute(command); 
      else { 
       super.execute(newComparableRunnableFor(command)); 
      } 
     } 

     private Runnable newComparableRunnableFor(Runnable runnable) { 
      return new ComparableRunnable(ensurePriorityRunnable(runnable)); 
     } 

     @Override 
     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
      return new ComparableFutureTask<>(ensurePriorityCallable(callable)); 
     } 

     @Override 
     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
      return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); 
     } 

     private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) { 
      return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable 
        : PriorityCallable.of(callable, DEFAULT_PRIORITY); 
     } 

     private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { 
      return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable 
        : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); 
     } 

     private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { 
      private Long sequentialOrder = instanceCounter.getAndIncrement(); 
      private HasPriority hasPriority; 

      public ComparableFutureTask(PriorityCallable<T> priorityCallable) { 
       super(priorityCallable); 
       this.hasPriority = priorityCallable; 
      } 

      public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { 
       super(priorityRunnable, result); 
       this.hasPriority = priorityRunnable; 
      } 

      @Override 
      public long getInstanceCount() { 
       return sequentialOrder; 
      } 

      @Override 
      public int getPriority() { 
       return hasPriority.getPriority(); 
      } 
     } 

     private static class ComparableRunnable implements Runnable, ComparableTask { 
      private Long instanceCount = instanceCounter.getAndIncrement(); 
      private HasPriority hasPriority; 
      private Runnable runnable; 

      public ComparableRunnable(PriorityRunnable priorityRunnable) { 
       this.runnable = priorityRunnable; 
       this.hasPriority = priorityRunnable; 
      } 

      @Override 
      public void run() { 
       runnable.run(); 
      } 

      @Override 
      public int getPriority() { 
       return hasPriority.getPriority(); 
      } 

      @Override 
      public long getInstanceCount() { 
       return instanceCount; 
      } 
     } 

     private interface ComparableTask extends Runnable { 
      int getPriority(); 

      long getInstanceCount(); 

      public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { 
       return (o1, o2) -> { 
        int priorityResult = o2.getPriority() - o1.getPriority(); 
        return priorityResult != 0 ? priorityResult 
          : (int) (o1.getInstanceCount() - o2.getInstanceCount()); 
       }; 
      } 

     } 

    } 

    private static interface HasPriority { 
     int getPriority(); 
    } 

    public interface PriorityCallable<V> extends Callable<V>, HasPriority { 

     public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) { 
      return new PriorityCallable<V>() { 
       @Override 
       public V call() throws Exception { 
        return callable.call(); 
       } 

       @Override 
       public int getPriority() { 
        return priority; 
       } 
      }; 
     } 
    } 

    public interface PriorityRunnable extends Runnable, HasPriority { 

     public static PriorityRunnable of(Runnable runnable, int priority) { 
      return new PriorityRunnable() { 
       @Override 
       public void run() { 
        runnable.run(); 
       } 

       @Override 
       public int getPriority() { 
        return priority; 
       } 
      }; 
     } 
    } 

} 
+0

: non funziona insieme all'ascoltoDecorator di Guava .... :( –

Problemi correlati