2012-05-24 10 views
6

Provo ad implementare un ConcurrentDictionary avvolgendolo in BlockingCollection ma non sembra aver avuto successo.Come includere ConcurrentDictionary in BlockingCollection?

Capisco che una dichiarazione delle variabili lavorano con BlockingCollection quali ConcurrentBag<T>, ConcurrentQueue<T>, ecc

Così, per creare un ConcurrentBag avvolta in un BlockingCollection vorrei dichiarare e creare un'istanza in questo modo:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>()); 

ma come farlo per ConcurrentDictionary? Ho bisogno della funzionalità di blocco di BlockingCollection sul lato produttore e consumatore.

+0

Anche il dizionario (e ConcurrentDictionary) non conserva l'ordine degli elementi. Puoi descrivere il tuo scenario produttore-consumatore? – Dennis

+0

@Dennis, ne sono consapevole. Un produttore memorizza KeyValuePairs nel concurrentDictionary e un'attività del cliente incrementa un int e rimuove KeyValuePair se l'int corrisponde con la rispettiva chiave. Lo faccio perché le attività worker popolano il concurrentDictionary con valori ma in ordine arbitrario, l'attività consumer assicura che i valori ricevuti vengano passati/lavorati nell'ordine corretto. Un ConcurrentDictionary può essere incluso in BlockingCollection? –

+0

Quale soluzione ti è venuta in mente? Sto cercando di trovare una buona soluzione per un problema simile in cui il produttore non produce articoli nell'ordine richiesto dal consumatore. (vecchio post lo so, ma vale la pena provare) – Kim

risposta

1

Avrai bisogno di scrivere una classe adattatore - qualcosa come:

public class ConcurrentDictionaryWrapper<TKey,TValue> : IProducerConsumerCollection<KeyValuePair<TKey,TValue>> 
{ 
    private ConcurrentDictionary<TKey, TValue> dictionary; 

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
    { 
     return dictionary.GetEnumerator(); 
    } 

    IEnumerator IEnumerable.GetEnumerator() 
    { 
     return GetEnumerator(); 
    } 

    public void CopyTo(Array array, int index) 
    { 
     throw new NotImplementedException(); 
    } 

    public int Count 
    { 
     get { return dictionary.Count; } 
    } 

    public object SyncRoot 
    { 
     get { return this; } 
    } 

    public bool IsSynchronized 
    { 
     get { return true; } 
    } 

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
    { 
     throw new NotImplementedException(); 
    } 

    public bool TryAdd(KeyValuePair<TKey, TValue> item) 
    { 
     return dictionary.TryAdd(item.Key, item.Value); 
    } 

    public bool TryTake(out KeyValuePair<TKey, TValue> item) 
    { 
     item = dictionary.FirstOrDefault(); 
     TValue value; 
     return dictionary.TryRemove(item.Key, out value); 
    } 

    public KeyValuePair<TKey, TValue>[] ToArray() 
    { 
     throw new NotImplementedException(); 
    } 
} 
+1

Grazie per il suggerimento sul codice. Ma il mio scopo principale nell'uso di BlockingCollection era la possibilità di contrassegnare la raccolta come Adding Completed e di controllarne lo stato, nonché se la sua aggiunta fosse completa e vuota, in modo simile a quanto fornito da BlockingCollection. Sono consapevole di poter aggiungere facilmente tale funzionalità, ma sto cercando un suggerimento su come farlo direttamente attraverso BlockingCollection. Finora non vedo un motivo per cui non possa funzionare direttamente attraverso la raccolta di Blocking. Forse ci vuole solo IProducerConsumerCollection ? –

4

forse avete bisogno di un dizionario concomitante di BlockingCollection

 ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>(); 
     int maxBoxes = 5; 

     CancellationTokenSource cancelationTokenSource = new CancellationTokenSource(); 
     CancellationToken cancelationToken = cancelationTokenSource.Token; 

     Random rnd = new Random(); 
     // Producer 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // put the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       box.Add("some message " + index, cancelationToken); 
       Console.WriteLine("Produced a letter to put in box " + index); 

       // Wait simulating a heavy production item. 
       Thread.Sleep(1000); 
      } 
     }); 

     // Consumer 1 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // get the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       var message = box.Take(cancelationToken); 
       Console.WriteLine("Consumed 1: " + message); 

       // consume a item cost less than produce it: 
       Thread.Sleep(50); 
      } 
     }); 

     // Consumer 2 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // get the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       var message = box.Take(cancelationToken); 
       Console.WriteLine("Consumed 2: " + message); 

       // consume a item cost less than produce it: 
       Thread.Sleep(50); 
      } 
     }); 

     Console.ReadLine(); 
     cancelationTokenSource.Cancel(); 

In questo modo, un consumatore che si aspettava qualcosa di nella casella di posta 5, attenderà che il prodotto inserisca una lettera nella casella di posta 5.

Problemi correlati