2010-12-29 9 views
9

Ho un precedente question che ho fornito la mia soluzione; tuttavia, non ho accesso a ConcurrentQueue<T> da quando sono su .Net 3.5. Ho bisogno di Queue<T> per consentire la concorrenza. Ho letto questo question e sembra presentare un problema se un articolo è non nella coda e il metodo filettato tenta di desquare un elemento.Abilitazione coda <T> con concomitanza

Il mio compito ora è determinare se posso derivare la mia classe di coda concorrente. Questo è ciò che mi si avvicinò con:

public sealed class ConcurrentQueue : Queue<DataTable> 
{ 
    public event EventHandler<TableQueuedEventArgs> TableQueued; 
    private ICollection que; 

    new public void Enqueue(DataTable Table) 
    { 
     lock (que.SyncRoot) 
     { 
      base.Enqueue(Table); 
     } 

     OnTableQueued(new TableQueuedEventArgs(Dequeue())); 
    } 

    // this is where I think I will have a problem... 
    new public DataTable Dequeue() 
    { 
     DataTable table; 

     lock (que.SyncRoot) 
     { 
      table = base.Dequeue(); 
     } 

     return table; 
    } 

    public void OnTableQueued(TableQueuedEventArgs table) 
    { 
     EventHandler<TableQueuedEventArgs> handler = TableQueued; 

     if (handler != null) 
     { 
      handler(this, table); 
     } 
    } 
} 

Così, quando un DataTable è in coda, i EventArgs passerà un tavolo accodamento al sottoscrittore dell'evento. Questa implementazione mi fornirà una coda thread-safe?

+3

'que' è _utilmente_ inutile. Dovresti bloccare un oggetto chiave 'readonly = new object();'. – SLaks

+0

@SLaks: ho implementato 'ICollection que' e' lock (que.SyncRoot) 'basato su MSDN: http://msdn.microsoft.com/en-us/library/bb344892.aspx – IAbstract

+0

Non ne hai bisogno a tutti. 'SyncRoot' è utile se hai pezzi di codice disgiunti che devono essere bloccati per la stessa collezione. Nel tuo caso, 'que' è' null'. Hai solo bisogno di bloccare un singolo oggetto nei tuoi metodi. – SLaks

risposta

2

Stai rimuovendo i tuoi articoli mentre li accodai.
È necessario aumentare l'evento utilizzando il parametro.

Se è effettivamente thread-safe dipende da come lo si utilizza.
Se si controlla lo Count o si verifica la presenza di vuoto, non è protetto da thread e non può essere reso facilmente protetto da thread.
Se non lo fai, probabilmente puoi usare qualcosa di più semplice di una coda.

+0

Si prevede che l'accodamento sia più rapido della rimozione della coda (tuttavia, non ho ancora dati per confermare in entrambi i casi). L'accodamento viene eseguito anche sul thread principale. Quando l'evento si attiva, utilizzo un'azione 'BeginInvoke' per eseguire il processo di rimozione della coda in modo asincrono. – IAbstract

+0

Inoltre ... Non ho mai controllato "Conteggio" e non ho visto la necessità di farlo visto che sto quasi scomparendo quasi immediatamente su un thread di lavoro. Non so cosa sarebbe più semplice di una coda a meno che non andassi a un elenco - cosa che ho quasi fatto, ma la classe Queue sembrava avere tutto ciò di cui avrei bisogno eccetto, ovviamente, per l'evento quando un oggetto è in coda. – IAbstract

+0

Sembra che tu non abbia bisogno di una coda; puoi semplicemente passare la tabella come parametro a 'BeginInvoke'. – SLaks

3

Un breve viaggio verso il mio motore di ricerca preferito ha rivelato che la mia memoria era corretta; you can get the Task Parallel Library even on .NET 3.5. Vedi anche The PFX team blog post on the subject e lo Reactive Extensions che scarichi per ottenere allo System.Threading.dll desiderato.

+0

Sfortunatamente, sono legato (per policy interne) alle librerie di base utilizzate in .Net 3.5 - fondamentalmente mantenendo tutti gli sviluppatori che usano le stesse librerie. Se provo a usare TPLib, sarò contrassegnato come un ladro. Altrimenti, una buona opzione. – IAbstract

+3

Questa è una politica _insane_. Re-inventare la ruota è uno dei peggiori scarichi di produttività e una grande fonte di bug. – SLaks

+4

Questo tipo di politica viene solitamente applicato da persone che non sanno nulla sulla programmazione ... –

0

Nella linea OnTableQueued(new TableQueuedEventArgs(Dequeue())); nel metodo Enqueue

uso Peek invece di Dequeue

Dovrebbe essere

OnTableQueued(new TableQueuedEventArgs(base.Peek()));

+0

Questo non è thread-safe. Hai un parametro; ** usalo! ** – SLaks

3

Il fatto è necessario utilizzare new per nascondere i metodi dalla base la classe indica in genere che è necessario utilizzare la composizione anziché l'ereditarietà ...

Ecco un semplice coda di sincronizzato, che non usa l'ereditarietà, ma ancora si basa sul comportamento dello standard Queue<T>:

public class ConcurrentQueue<T> : ICollection, IEnumerable<T> 
{ 
    private readonly Queue<T> _queue; 

    public ConcurrentQueue() 
    { 
     _queue = new Queue<T>(); 
    } 

    public IEnumerator<T> GetEnumerator() 
    { 
     lock (SyncRoot) 
     { 
      foreach (var item in _queue) 
      { 
       yield return item; 
      } 
     } 
    } 

    IEnumerator IEnumerable.GetEnumerator() 
    { 
     return GetEnumerator(); 
    } 

    public void CopyTo(Array array, int index) 
    { 
     lock (SyncRoot) 
     { 
      ((ICollection)_queue).CopyTo(array, index); 
     } 
    } 

    public int Count 
    { 
     get 
     { 
      // Assumed to be atomic, so locking is unnecessary 
      return _queue.Count; 
     } 
    } 

    public object SyncRoot 
    { 
     get { return ((ICollection)_queue).SyncRoot; } 
    } 

    public bool IsSynchronized 
    { 
     get { return true; } 
    } 

    public void Enqueue(T item) 
    { 
     lock (SyncRoot) 
     { 
      _queue.Enqueue(item); 
     } 
    } 

    public T Dequeue() 
    { 
     lock(SyncRoot) 
     { 
      return _queue.Dequeue(); 
     } 
    } 

    public T Peek() 
    { 
     lock (SyncRoot) 
     { 
      return _queue.Peek(); 
     } 
    } 

    public void Clear() 
    { 
     lock (SyncRoot) 
     { 
      _queue.Clear(); 
     } 
    } 
} 
+0

Cosa significa implementazione esplicita dell'interfaccia e cast su _queue? – abatishchev

+1

'lock'-ing in un iteratore è pericoloso. – SLaks

+1

Inoltre, questo non è ancora sicuro per la maggior parte dei casi d'uso. – SLaks

2

Qualche tempo dopo la domanda iniziale, lo so (questo è venuto in su come "correlate" a destra di un'altra domanda), ma sono andato con il seguente in casi simili. Non è buono per l'uso della cache della CPU come potrebbe essere, ma l'uso semplice della cache, privo di lock, thread-safe e spesso CPU-CPU non è così importante se ci sono spesso grandi lacune tra le operazioni e quando non la vicinanza dell'allocazione potrebbe ridurre l'impatto:

internal sealed class LockFreeQueue<T> 
{ 
    private sealed class Node 
    { 
    public readonly T Item; 
    public Node Next; 
    public Node(T item) 
    { 
     Item = item; 
    } 
    } 
    private volatile Node _head; 
    private volatile Node _tail; 
    public LockFreeQueue() 
    { 
    _head = _tail = new Node(default(T)); 
    } 
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked 
    public void Enqueue(T item) 
    { 
    Node newNode = new Node(item); 
    for(;;) 
    { 
     Node curTail = _tail; 
     if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail. 
     { 
     Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread. 
     return; 
     } 
     else 
     { 
     Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread. 
     } 
    } 
    }  
    public bool TryDequeue(out T item) 
    { 
    for(;;) 
    { 
     Node curHead = _head; 
     Node curTail = _tail; 
     Node curHeadNext = curHead.Next; 
     if (curHead == curTail) 
     { 
     if (curHeadNext == null) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
      Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread 
     } 
     else 
     { 
     item = curHeadNext.Item; 
     if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead) 
     { 
      return true; 
     } 
     } 
    } 
    } 
#pragma warning restore 420 
} 
Problemi correlati