2010-07-19 20 views
7

System.Collections.Concurrent ha alcune nuove raccolte che funzionano molto bene in ambienti con multithreading. Tuttavia, sono un po 'limitati. O bloccano finché un articolo non diventa disponibile, oppure restituiscono default(T) (metodi TryXXX).Raccolta simultanea non bloccante?

Ho bisogno di una raccolta che sia thread-safe, ma invece di bloccare il thread chiamante usa un callback per informarmi che almeno un elemento è disponibile.

La mia soluzione attuale è utilizzare BlockingCollection, ma utilizzare l'APM con un delegato per ottenere l'elemento successivo. In altre parole, creo un delegato su un metodo che è Take dalla raccolta ed eseguo tale delega utilizzando BeginInvoke.

Sfortunatamente, devo mantenere molto stato all'interno della mia classe per raggiungere questo obiettivo. Peggio ancora, la classe non è thread-safe; può essere utilizzato solo da un singolo thread. Sto superando il limite della manutenibilità, che preferirei non fare.

So che ci sono alcune librerie là fuori che rendono quello che sto facendo qui piuttosto semplice (credo che il Reactive Framework sia uno di questi), ma mi piacerebbe realizzare i miei obiettivi senza aggiungere alcun riferimento al di fuori della versione 4 del quadro.

Esistono schemi migliori che posso utilizzare che non richiedono riferimenti esterni che raggiungono il mio obiettivo?


tl; dr:

Ci sono dei modelli che soddisfano il requisito:

"Ho bisogno di segnalare una collezione che sono pronto per l'elemento successivo, e avere la collezione esecuzione una richiamata quando il prossimo elemento è arrivato, senza che nessun thread sia bloccato. "

+0

Questo thread-safe? Che cosa sta bloccando l'elemento disponibile che diventa non disponibile prima che il delegato venga richiamato? E qual è il tuo obiettivo generale (ad es. Un sistema di accodamento)? –

+0

@Adam Buon punto sul consumo dell'elemento. Il delegato prende l'oggetto rimosso dalla raccolta. Quindi l'esecuzione del delegato è bloccata finché un elemento non è 'Take'-en dalla raccolta, e quell'elemento è l'oggetto' passato a EndInvoke. L'obiettivo generale è un po 'contorto; in sostanza devo rallentare un flusso di lavoro finché l'articolo non diventa disponibile. Non è possibile bloccare l'esecuzione del flusso di lavoro, quindi semplicemente "Prendere" un oggetto non funzionerà come i blocchi di chiamata. Devo creare un segnalibro, quindi passare a un'estensione. L'estensione richiama il delegato, riprendendo il segnalibro all'interno del callback. – Will

+0

sfortunatamente ho poca esperienza con i flussi di lavoro - prova ad aggiungere quel dettaglio alla tua domanda e potrebbe suscitare l'interesse di qualcuno :-) –

risposta

4

Penso di avere due soluzioni possibili. Non sono particolarmente soddisfatto né dell'uno né dell'altro, ma forniscono almeno un'alternativa ragionevole all'approccio APM.

Il primo non soddisfa la vostra esigenza di nessun filo di blocco, ma penso che è piuttosto elegante, in quanto è possibile registrare callback e avranno chiamato round-robin, ma avete ancora la possibilità di chiamare Take o TryTake come faresti normalmente per un BlockingCollection. Questo codice forza la registrazione dei callback ogni volta che viene richiesto un articolo. Questo è il meccanismo di segnalazione per la raccolta. La cosa bella di questo approccio è che le chiamate a Take non vengono affamate come fanno nella mia seconda soluzione.

public class NotifyingBlockingCollection<T> : BlockingCollection<T> 
{ 
    private Thread m_Notifier; 
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
     m_Notifier = new Thread(Notify); 
     m_Notifier.IsBackground = true; 
     m_Notifier.Start(); 
    } 

    private void Notify() 
    { 
     while (true) 
     { 
      Action<T> callback = m_Callbacks.Take(); 
      T item = Take(); 
      callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
     } 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     m_Callbacks.Add(callback); 
    } 
} 

Il secondo soddisfa il requisito di nessun thread di blocco. Si noti come trasferisce l'invocazione della richiamata al pool di thread. L'ho fatto perché sto pensando che se fosse eseguito in modo sincrono, i blocchi sarebbero stati trattenuti più a lungo con conseguente bottlenecking di Add e RegisterForTake. L'ho esaminato da vicino e non penso che possa essere attivato dal vivo (sono disponibili sia un elemento che una richiamata, ma la richiamata non viene mai eseguita), ma è consigliabile verificarla per verificare. L'unico problema qui è che una chiamata a Take diventerebbe affamata poiché i callback hanno sempre la priorità.

public class NotifyingBlockingCollection<T> 
{ 
    private BlockingCollection<T> m_Items = new BlockingCollection<T>(); 
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
    } 

    public void Add(T item) 
    { 
     lock (m_Callbacks) 
     { 
      if (m_Callbacks.Count > 0) 
      { 
       Action<T> callback = m_Callbacks.Dequeue(); 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Items.Add(item); 
      } 
     } 
    } 

    public T Take() 
    { 
     return m_Items.Take(); 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     lock (m_Callbacks) 
     { 
      T item; 
      if (m_Items.TryTake(out item)) 
      { 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Callbacks.Enqueue(callback); 
      } 
     } 
    } 
} 
+0

Grazie per la risposta, ma non è leggermente quello che sto cercando. Questo è ciò che sto facendo attualmente, ma con l'APM inserito nella raccolta (il codice che hai fornito). Credo che il nocciolo del mio problema sia che APM non soddisfa i miei requisiti, è solo l'implementazione che ho usato. Le mie richieste richiedono uno schema che fornisca una soluzione alla domanda: "Come posso segnalare una collezione che sono pronto per il prossimo elemento e fare in modo che la raccolta esegua una richiamata quando quell'elemento successivo è arrivato, senza che nessun thread sia bloccato?" – Will

+0

Ho capito che non era quello che cercavi. È comunque un problema interessante. Peccato che 'Add' non sia' virtuale' altrimenti potresti essere stato in grado di iniettare la notifica in qualche modo. Forse potresti utilizzare una delle implementazioni della coda di blocco come punto di partenza. Il problema è che devi stare attento a come spedisci la notifica altrimenti un altro consumatore avrà afferrato l'oggetto per primo. Potrei giocarci oggi se avrò tempo. Pubblica una risposta da solo se la capisci. Non lo so ... potresti trovare più facile punt e fare semplicemente riferimento a un'altra libreria. –

+0

La notifica deve contenere l'elemento successivo e deve essere controllata dal notificatore. Forse l'idea che si tratti di una collezione è errata; solo tramite questo meccanismo può essere fornito il prossimo oggetto, evitando così il problema di due osservatori che si contendono un singolo oggetto. In altre parole, un osservatore non può utilizzare il meccanismo A per ottenere l'elemento successivo (ad esempio, 'T Pop()') mentre l'altro è registrato per una richiamata. – Will

3

Che ne dici di qualcosa di simile? (Probabilmente il naming potrebbe usare un po 'di lavoro e notare che questo non è stato verificato.)

public class CallbackCollection<T> 
{ 
    // Sychronization object to prevent race conditions. 
    private object _SyncObject = new object(); 

    // A queue for callbacks that are waiting for items. 
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>(); 

    // A queue for items that are waiting for callbacks. 
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>(); 

    public void Add(T item) 
    { 
     Action<T> callback; 
     lock (_SyncObject) 
     { 
      // Try to get a callback. If no callback is available, 
      // then enqueue the item to wait for the next callback 
      // and return. 
      if (!_Callbacks.TryDequeue(out callback)) 
      { 
       _Items.Enqueue(item); 
       return; 
      } 
     } 

     ExecuteCallback(callback, item); 
    } 

    public void TakeAndCallback(Action<T> callback) 
    { 
     T item; 
     lock(_SyncObject) 
     { 
      // Try to get an item. If no item is available, then 
      // enqueue the callback to wait for the next item 
      // and return. 
      if (!_Items.TryDequeue(out item)) 
      { 
       _Callbacks.Enqueue(callback); 
       return; 
      } 
     } 
     ExecuteCallback(callback, item); 
    } 

    private void ExecuteCallback(Action<T> callback, T item) 
    { 
     // Use a new Task to execute the callback so that we don't 
     // execute it on the current thread. 
     Task.Factory.StartNew(() => callback.Invoke(item)); 
    } 
} 
+0

Appena aggiornato e visto @ Brian's NotifyingBlockingCollection. Sembra che lui e io abbiamo inventato più o meno la stessa soluzione allo stesso tempo. –

+0

Sì, stavamo sicuramente pensando in questo modo, in particolare la parte su come ottenere l'invocazione del callback dal thread corrente. –