2012-05-04 12 views
15

Ho più thread di consumo in attesa su un CountDownLatch di dimensione 1 utilizzando await(). Ho un thread singolo produttore che chiama countDown() quando termina correttamente.Come si "annulla" un CountDownLatch?

Questo funziona perfettamente quando non ci sono errori.

Tuttavia, se il produttore rileva un errore, mi piacerebbe che fosse in grado di segnalare l'errore ai thread dei consumatori. Idealmente, potrei fare in modo che il produttore chiami qualcosa come abortCountDown() e che tutti i consumatori ricevano un'interruzione di interruzione o qualche altra eccezione. Non voglio chiamare countDown(), perché ciò richiede che tutti i thread dei consumatori eseguano un controllo manuale aggiuntivo per il successo dopo la loro chiamata a await(). Preferisco semplicemente ricevere un'eccezione, che sanno già come gestire.

So che una funzione di interruzione non è disponibile in CountDownLatch. Esiste un'altra primitiva di sincronizzazione che posso facilmente adattare per creare efficacemente uno CountDownLatch che supporti l'annullamento del conto alla rovescia?

risposta

11

JB Nizet ha avuto un'ottima risposta. Ho preso il suo e lucidato un po '. Il risultato è una sottoclasse di CountDownLatch chiamata AbortableCountDownLatch, che aggiunge un metodo "abort()" alla classe che farà sì che tutti i thread in attesa sul latch ricevano una AbortException (una sottoclasse di InterruptedException).

Inoltre, a differenza della classe di JB, AbortableCountDownLatch interromperà immediatamente tutti i thread di blocco in caso di interruzione, piuttosto che attendere che il conto alla rovescia raggiunga lo zero (per le situazioni in cui si utilizza un conteggio> 1).

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 

public class AbortableCountDownLatch extends CountDownLatch { 
    protected boolean aborted = false; 

    public AbortableCountDownLatch(int count) { 
     super(count); 
    } 


    /** 
    * Unblocks all threads waiting on this latch and cause them to receive an 
    * AbortedException. If the latch has already counted all the way down, 
    * this method does nothing. 
    */ 
    public void abort() { 
     if(getCount()==0) 
      return; 

     this.aborted = true; 
     while(getCount()>0) 
      countDown(); 
    } 


    @Override 
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
     final boolean rtrn = super.await(timeout,unit); 
     if (aborted) 
      throw new AbortedException(); 
     return rtrn; 
    } 

    @Override 
    public void await() throws InterruptedException { 
     super.await(); 
     if (aborted) 
      throw new AbortedException(); 
    } 


    public static class AbortedException extends InterruptedException { 
     public AbortedException() { 
     } 

     public AbortedException(String detailMessage) { 
      super(detailMessage); 
     } 
    } 
} 
+0

Come utilizzare questa classe, La mia situazione è che ho una lista e la lista si aggiorna dinamicamente in tempo reale. C'è un evento automatico in lista su cui devo aspettare 2 minuti ma se tra il tempo di attesa è arrivato il Manuale Utente Event nella lista devo interrompere l'attesa e procedere immediatamente. –

12

Incapsula questo comportamento all'interno di una specifica classe di livello superiore, utilizzando il CountDownLatch internamente:

public class MyLatch { 
    private CountDownLatch latch; 
    private boolean aborted; 
    ... 

    // called by consumers 
    public void await() throws AbortedException { 
     latch.await(); 
     if (aborted) { 
      throw new AbortedException(); 
     } 
    } 

    // called by producer 
    public void abort() { 
     this.aborted = true; 
     latch.countDown(); 
    } 

    // called by producer 
    public void succeed() { 
     latch.countDown(); 
    } 
} 
+1

Per rendere questo thread-safe, non si dovrebbe "abortire" essere "volatile"? –

+5

No, perché la sicurezza del thread è garantita dal CountDownLatch: il metodo countDown assicura che vi sia una barriera di memoria tra i thread. Javadoc dice: "Le azioni in una discussione prima di chiamare countDown() avvengono - prima delle azioni che seguono un ritorno positivo da un attendente corrispondente()" –

+0

Sì, ora mi ricordo di averlo letto (qualche tempo fa). Grazie. –

3

È possibile creare un wrapper CountDownLatch che offre la possibilità di cancellare i camerieri. Dovrà tenere traccia dei thread in attesa e rilasciarli quando scadono, oltre a ricordare che il latch è stato cancellato, quindi le future chiamate a await si interromperanno immediatamente.

public class CancellableCountDownLatch 
{ 
    final CountDownLatch latch; 
    final List<Thread> waiters; 
    boolean cancelled = false; 

    public CancellableCountDownLatch(int count) { 
     latch = new CountDownLatch(count); 
     waiters = new ArrayList<Thread>(); 
    } 

    public void await() throws InterruptedException { 
     try { 
      addWaiter(); 
      latch.await(); 
     } 
     finally { 
      removeWaiter(); 
     } 
    } 

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
     try { 
      addWaiter(); 
      return latch.await(timeout, unit); 
     } 
     finally { 
      removeWaiter(); 
     } 
    } 

    private synchronized void addWaiter() throws InterruptedException { 
     if (cancelled) { 
      Thread.currentThread().interrupt(); 
      throw new InterruptedException("Latch has already been cancelled"); 
     } 
     waiters.add(Thread.currentThread()); 
    } 

    private synchronized void removeWaiter() { 
     waiters.remove(Thread.currentThread()); 
    } 

    public void countDown() { 
     latch.countDown(); 
    } 

    public synchronized void cancel() { 
     if (!cancelled) { 
      cancelled = true; 
      for (Thread waiter : waiters) { 
       waiter.interrupt(); 
      } 
      waiters.clear(); 
     } 
    } 

    public long getCount() { 
     return latch.getCount(); 
    } 

    @Override 
    public String toString() { 
     return latch.toString(); 
    } 
} 
+0

Puoi spiegare con l'esempio come usare lo stesso. La mia situazione è che ho una lista e la lista si aggiorna dinamicamente in tempo reale. C'è un evento automatico in lista su cui devo aspettare 2 minuti ma se tra il tempo di attesa è arrivato il Manuale Utente Event nella lista devo interrompere l'attesa e procedere immediatamente. –

0

Si potrebbe rotolare il proprio CountDownLatch utilizzando un ReentrantLock che consente l'accesso alla sua getWaitingThreads metodo protetto.

Esempio:

public class FailableCountDownLatch { 
    private static class ConditionReentrantLock extends ReentrantLock { 
     private static final long serialVersionUID = 2974195457854549498L; 

     @Override 
     public Collection<Thread> getWaitingThreads(Condition c) { 
      return super.getWaitingThreads(c); 
     } 
    } 

    private final ConditionReentrantLock lock = new ConditionReentrantLock(); 
    private final Condition countIsZero = lock.newCondition(); 
    private long count; 

    public FailableCountDownLatch(long count) { 
     this.count = count; 
    } 

    public void await() throws InterruptedException { 
     lock.lock(); 
     try { 
      if (getCount() > 0) { 
       countIsZero.await(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public boolean await(long time, TimeUnit unit) throws InterruptedException { 
     lock.lock(); 
     try { 
      if (getCount() > 0) { 
       return countIsZero.await(time, unit); 
      } 
     } finally { 
      lock.unlock(); 
     } 
     return true; 
    } 

    public long getCount() { 
     lock.lock(); 
     try { 
      return count; 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void countDown() { 
     lock.lock(); 
     try { 
      if (count > 0) { 
       count--; 

       if (count == 0) { 
        countIsZero.signalAll(); 
       } 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void abortCountDown() { 
     lock.lock(); 
     try { 
      for (Thread t : lock.getWaitingThreads(countIsZero)) { 
       t.interrupt(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 

Si consiglia di cambiare questa classe di gettare un InterruptedException sulle nuove chiamate a await dopo che è stato annullato. Potresti anche avere questa classe estendere CountDownLatch se hai bisogno di quella funzionalità.

Problemi correlati