2013-01-10 30 views
14

Qualcuno sa se c'è qualche implementazione fermo che fa il seguente:Fermo che può essere incrementato

  • ha un metodo per diminuire il valore di chiusura, o attendere se il valore è zero
  • ha un metodo per l'attesa per il valore latch a zero
  • ha un metodo per aggiungere un numero al valore del latch

risposta

4

Invece di ricominciare da AQS, è possibile utilizzare un'implementazione semplice come di seguito. È un po 'ingenuo (è algoritmi sincronizzati contro AQS lock-free) ma a meno che non ci si aspetti di usarlo in uno scenario contento potrebbe essere abbastanza buono.

public class CountUpAndDownLatch { 
    private CountDownLatch latch; 
    private final Object lock = new Object(); 

    public CountUpAndDownLatch(int count) { 
     this.latch = new CountDownLatch(count); 
    } 

    public void countDownOrWaitIfZero() throws InterruptedException { 
     synchronized(lock) { 
      while(latch.getCount() == 0) { 
       lock.wait(); 
      } 
      latch.countDown(); 
      lock.notifyAll(); 
     } 
    } 

    public void waitUntilZero() throws InterruptedException { 
     synchronized(lock) { 
      while(latch.getCount() != 0) { 
       lock.wait(); 
      } 
     } 
    } 

    public void countUp() { //should probably check for Integer.MAX_VALUE 
     synchronized(lock) { 
      latch = new CountDownLatch((int) latch.getCount() + 1); 
      lock.notifyAll(); 
     } 
    } 

    public int getCount() { 
     synchronized(lock) { 
      return (int) latch.getCount(); 
     } 
    } 
} 

Nota: non ho ancora testato in profondità ma sembra comportarsi come previsto:

public static void main(String[] args) throws InterruptedException { 
    final CountUpAndDownLatch latch = new CountUpAndDownLatch(1); 
    Runnable up = new Runnable() { 
     @Override 
     public void run() { 
      try { 
       System.out.println("IN UP " + latch.getCount()); 
       latch.countUp(); 
       System.out.println("UP " + latch.getCount()); 
      } catch (InterruptedException ex) { 
      } 
     } 
    }; 

    Runnable downOrWait = new Runnable() { 
     @Override 
     public void run() { 
      try { 
       System.out.println("IN DOWN " + latch.getCount()); 
       latch.countDownOrWaitIfZero(); 
       System.out.println("DOWN " + latch.getCount()); 
      } catch (InterruptedException ex) { 
      } 
     } 
    }; 

    Runnable waitFor0 = new Runnable() { 
     @Override 
     public void run() { 
      try { 
       System.out.println("WAIT FOR ZERO " + latch.getCount()); 
       latch.waitUntilZero(); 
       System.out.println("ZERO " + latch.getCount()); 
      } catch (InterruptedException ex) { 
      } 
     } 
    }; 
    new Thread(waitFor0).start(); 
    up.run(); 
    downOrWait.run(); 
    Thread.sleep(100); 
    downOrWait.run(); 
    new Thread(up).start(); 
    downOrWait.run(); 
} 

uscita:

IN UP 1 
UP 2 
WAIT FOR ZERO 1 
IN DOWN 2 
DOWN 1 
IN DOWN 1 
ZERO 0 
DOWN 0 
IN DOWN 0 
IN UP 0 
DOWN 0 
UP 0 
+0

Sì, funzionerebbe. Ma vorrei qualcosa che non utilizza 2 oggetti di sincronizzazione per ottenere questo. Ne ho implementato uno usando LockSupport e sto attualmente testandolo per vedere se funziona e va bene (AQS non sembra essere buono neanche per questo). – Razvi

+0

@ dead10ck perché no? – assylias

4

java.util.concurrent.Semaphore sembra perfetto.

  • acquire() o acquisire (n)
  • anche acquisire() (non sicuro di aver capito quale sia la differenza qui) (*)
  • release() o rilascio (n)

(*) OK, non è disponibile alcun metodo incorporato per attendere che il semaforo diventi non disponibile. Suppongo che scriveresti il ​​tuo wrapper per acquire che faccia prima un tryAcquire e che, se fallisce, attiva il tuo "evento occupato" (e continua a usare il normale acquire). Tutti dovrebbero chiamare il tuo wrapper. Forse sottoclasse Semaphore?

+0

è quasi buono. Ma non vedo l'ora che sia zero da quello che so. – Razvi

+0

anche acquisire() è lo stesso dell'acquisizione (1) – Razvi

+0

@assylias: Grazie, aggiornato (prendo solo ciò che Google mi dà quando cerco il nome della classe) – Thilo

0

avevo bisogno di uno e costruito utilizzando la stessa strategia come CountDownLatch che usa AQS (non-blocking), questa classe è anche molto simile (Se non esatta) a quella creata per Apache Camel, penso che sia anche più leggera di JDK Phaser, questo agirà proprio come CountDownLact da JDK, it wo non ti consente di eseguire il conto alla rovescia sotto zero e ti consentirà di eseguire il conto alla rovescia e in alto:

import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountingLatch 
{ 
    /** 
    * Synchronization control for CountingLatch. 
    * Uses AQS state to represent count. 
    */ 
    private static final class Sync extends AbstractQueuedSynchronizer 
    { 
    private Sync() 
    { 
    } 

    private Sync(final int initialState) 
    { 
     setState(initialState); 
    } 

    int getCount() 
    { 
     return getState(); 
    } 

    protected int tryAcquireShared(final int acquires) 
    { 
     return getState()==0 ? 1 : -1; 
    } 

    protected boolean tryReleaseShared(final int delta) 
    { 
     // Decrement count; signal when transition to zero 
     for(; ;){ 
     final int c=getState(); 
     final int nextc=c+delta; 
     if(nextc<0){ 
      return false; 
     } 
     if(compareAndSetState(c,nextc)){ 
      return nextc==0; 
     } 
     } 
    } 
    } 

    private final Sync sync; 

    public CountingLatch() 
    { 
    sync=new Sync(); 
    } 

    public CountingLatch(final int initialCount) 
    { 
    sync=new Sync(initialCount); 
    } 

    public void increment() 
    { 
    sync.releaseShared(1); 
    } 

    public int getCount() 
    { 
    return sync.getCount(); 
    } 

    public void decrement() 
    { 
    sync.releaseShared(-1); 
    } 

    public void await() throws InterruptedException 
    { 
    sync.acquireSharedInterruptibly(1); 
    } 

    public boolean await(final long timeout) throws InterruptedException 
    { 
    return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout)); 
    } 
} 
1

Per coloro che necessitano di una soluzione basata su AQS, ecco uno che ha funzionato per me:

public class CountLatch { 

    private class Sync extends AbstractQueuedSynchronizer { 
     private static final long serialVersionUID = 1L; 

     public Sync() { 
     } 

     @Override 
     protected int tryAcquireShared(int arg) { 
      return count.get() == releaseValue ? 1 : -1; 
     } 

     @Override 
     protected boolean tryReleaseShared(int arg) { 
      return true; 
     } 
    } 

    private final Sync sync; 
    private final AtomicLong count; 
    private volatile long releaseValue; 

    public CountLatch(final long initial, final long releaseValue) { 
     this.releaseValue = releaseValue; 
     this.count = new AtomicLong(initial); 
     this.sync = new Sync(); 
    } 

    public void await() throws InterruptedException { 
     sync.acquireSharedInterruptibly(1); 
    } 

    public long countUp() { 
     final long current = count.incrementAndGet(); 
     if (current == releaseValue) { 
      sync.releaseShared(0); 
     } 
     return current; 
    } 

    public long countDown() { 
     final long current = count.decrementAndGet(); 
     if (current == releaseValue) { 
      sync.releaseShared(0); 
     } 
     return current; 
    } 

    public long getCount() { 
     return count.get(); 
    } 
} 

si inizializza il sincronizzatore, con un valore iniziale e la destinazione. Una volta raggiunto il valore target (eseguendo il conteggio su e/o giù), i thread in attesa verranno rilasciati.

30

Si potrebbe anche usare un Phaser (java.util.concurrent.Phaser)

final Phaser phaser = new Phaser(1); // register self 
while (/* some condition */) { 
    phaser.register(); // Equivalent to countUp 
    // do some work asynchronously, invoking 
    // phaser.arriveAndDeregister() (equiv to countDown) in a finally block 
} 
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete 

Spero che questo aiuta.

+1

Ho trovato questo molto più flessibile di qualsiasi altra opzione. – donlys

0

Questa è una variazione su CounterLatch, disponibile dal sito Apache.

La loro versione, per ragioni a loro più note, blocca il thread del chiamante mentre la variabile (AtomicInteger) ha un valore determinato.

Ma è l'altezza della facilità di modificare questo codice in modo che sia possibile scegliere solo la versione di Apache o ... "attendere qui fino al il contatore raggiunge un determinato valore". Probabilmente quest'ultimo avrà maggiore applicabilità. Nel mio caso particolare ho frastagliato questo perché volevo controllare che tutti i "pezzi" fossero stati pubblicati in SwingWorker.process() ... ma da allora ho trovato altri usi per questo.

Qui è scritto in Jython, ufficialmente la migliore lingua del mondo (TM). A un certo punto arriverò a dare una versione Java.

class CounterLatch(): 
    def __init__(self, initial = 0, wait_value = 0, lift_on_reached = True): 
     self.count = java.util.concurrent.atomic.AtomicLong(initial) 
     self.signal = java.util.concurrent.atomic.AtomicLong(wait_value) 

     class Sync(java.util.concurrent.locks.AbstractQueuedSynchronizer): 
      def tryAcquireShared(sync_self, arg): 
       if lift_on_reached: 
        return -1 if ((not self.released.get()) and self.count.get() != self.signal.get()) else 1 
       else: 
        return -1 if ((not self.released.get()) and self.count.get() == self.signal.get()) else 1 
      def tryReleaseShared(self, args): 
       return True 

     self.sync = Sync() 
     self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False 

    def await(self, *args): 
     if args: 
      assert len(args) == 2 
      assert type(args[ 0 ]) is int 
      timeout = args[ 0 ] 
      assert type(args[ 1 ]) is java.util.concurrent.TimeUnit 
      unit = args[ 1 ] 
      return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) 
     else: 
      self.sync.acquireSharedInterruptibly(1) 

    def count_relative(self, n): 
     previous = self.count.addAndGet(n) 
     if previous == self.signal.get(): 
      self.sync.releaseShared(0) 
     return previous 

NB la versione di Apache utilizza la parola chiave per volatilesignal e released. In Jython non penso che esista come tale, ma usando AtomicInteger e AtomicBoolean assicuriamo che nessun valore sia "non aggiornato" in nessun thread.

Esempio Utilizzo:

Nel costruttore SwingWorker:

self.publication_counter_latch = CounterLatch() 

In SW.publish:

# increase counter value BEFORE publishing chunks 
self.publication_counter_latch.count_relative(len(chunks)) 
self.super__publish(chunks) 

In SW.process:

# ... do sthg [HERE] with the chunks! 
# AFTER having done what you want to do with your chunks: 
self.publication_counter_latch.count_relative(- len(chunks)) 

Nel thread in attesa dell'elaborazione del blocco t o di arresto:

worker.publication_counter_latch.await() 
0

Mi sembra un CountDownLatch farà come si desidera:

Un CountDownLatch viene inizializzato con un dato numero. Attendere i metodi blocco fino a che il conteggio corrente non raggiunge zero a causa di invocazioni del metodo countDown(), dopo il quale tutti i thread in attesa vengono rilasciati e eventuali richiami successivi di attesa restituiscono immediatamente. Questo è un fenomeno one-shot - il conteggio non può essere resettato. Se è necessaria una versione che reimposta il conteggio, prendere in considerazione l'uso di CyclicBarrier.

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html

Problemi correlati