2012-08-14 21 views
16

Sto utilizzando un BlockingCollection per implementare un modello produttore/consumatore. Ho un ciclo asincrono che riempie la collezione con i dati da elaborare che possono poi essere raggiunti dal client in un momento successivo. I pacchetti arrivano scarsamente e vorrei che il polling fosse fatto senza usare una chiamata bloccante.Asincrono Presa dal blocco della raccolta

In sostanza, sto cercando qualcosa come un BeginTake e EndTake che non esiste nella raccolta di blocco in modo che io possa fare uso del pool di thread interno in un callback. Non deve essere un BlockingCollection con qualsiasi mezzo. Tutto ciò che fa ciò di cui ho bisogno sarebbe fantastico.

Questo è quello che ho ora. _bufferedPackets è un BlockingCollection<byte[]>:

public byte[] Read(int timeout) 
{ 
    byte[] result; 
    if (_bufferedPackets.IsCompleted) 
    { 
     throw new Exception("Out of packets"); 
    } 
    _bufferedPackets.TryTake(out result, timeout);  
    return result; 
} 

mi piacerebbe che questo sia qualcosa di simile, in pseudocodice:

public void Read(int timeout) 
{ 
    _bufferedPackets.BeginTake(result => 
     { 
      var bytes = _bufferedPackets.EndTake(result); 
      // Process the bytes, or the resuting timeout 
     }, timeout, _bufferedPackets); 
} 

Quali sono le opzioni per questo? Non voglio posizionare lo qualsiasi thread in uno stato di attesa, dal momento che ci sono molti altri elementi di I/O da elaborare, e mi troverei a corto di thread abbastanza rapidamente.

Aggiornamento: Ho riscritto il codice in questione per utilizzare il processo asincrono in modo diverso, essenzialmente scambiando i callback in base a se c'è una richiesta in attesa entro il limite di timeout. Funziona bene, ma sarebbe comunque fantastico se ci fosse un modo per farlo senza ricorrere ai timer e scambiando lambda attorno a chi potenzialmente causa condizioni di gara ed è difficile da scrivere (e capire). Ho risolto questo anche con una propria implementazione di una coda asincrona, ma sarebbe comunque fantastico se esistesse un'opzione più standard e ben testata.

+0

Al momento, penso che nessuna raccolta di TPL fornisca metodi asincroni ad eccezione di ObservableCollection per l'interfaccia utente. Cosa ne pensi ? –

+0

È possibile eseguire il wrapping in un 'Task task = Task.Factory.StartNew (() => {// Il codice restituisce byte []});' tuttavia questo non è elligente e deve esserci un modo migliore .. – MoonKnight

+0

Il wrapping in un'attività comporterà un'attività che verrà bloccata in un handle di attesa. Dal momento che ci sono molti compiti in corso che occuperanno un compito per sempre che mi farà esaurire i compiti nella piscina, purtroppo. – Dervall

risposta

0

Quindi non sembra essere un'opzione incorporata per questo, sono uscito e ho cercato di fare del mio meglio per fare quello che volevo come esperimento. Risulta che c'è molto da fare per rendere questo lavoro più o meno come gli altri utenti del vecchio modello asincrono.

public class AsyncQueue<T> 
{ 
    private readonly ConcurrentQueue<T> queue; 
    private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue; 

    private class DequeueAsyncResult : IAsyncResult 
    { 
     public bool IsCompleted { get; set; } 
     public WaitHandle AsyncWaitHandle { get; set; } 
     public object AsyncState { get; set; } 
     public bool CompletedSynchronously { get; set; } 
     public T Result { get; set; } 

     public AsyncCallback Callback { get; set; } 
    } 

    public AsyncQueue() 
    { 
     dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>(); 
     queue = new ConcurrentQueue<T>(); 
    } 

    public void Enqueue(T item) 
    { 
     DequeueAsyncResult asyncResult; 
     while (dequeueQueue.TryDequeue(out asyncResult)) 
     { 
      if (!asyncResult.IsCompleted) 
      { 
       asyncResult.IsCompleted = true; 
       asyncResult.Result = item; 

       ThreadPool.QueueUserWorkItem(state => 
       { 
        if (asyncResult.Callback != null) 
        { 
         asyncResult.Callback(asyncResult); 
        } 
        else 
        { 
         ((EventWaitHandle) asyncResult.AsyncWaitHandle).Set(); 
        } 
       }); 
       return; 
      } 
     } 
     queue.Enqueue(item); 
    } 

    public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state) 
    { 
     T result; 
     if (queue.TryDequeue(out result)) 
     { 
      var dequeueAsyncResult = new DequeueAsyncResult 
      { 
       IsCompleted = true, 
       AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset), 
       AsyncState = state, 
       CompletedSynchronously = true, 
       Result = result 
      }; 
      if (null != callback) 
      { 
       callback(dequeueAsyncResult); 
      } 
      return dequeueAsyncResult; 
     } 

     var pendingResult = new DequeueAsyncResult 
     { 
      AsyncState = state, 
      IsCompleted = false, 
      AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset), 
      CompletedSynchronously = false, 
      Callback = callback 
     }; 
     dequeueQueue.Enqueue(pendingResult); 
     Timer t = null; 
     t = new Timer(_ => 
     { 
      if (!pendingResult.IsCompleted) 
      { 
       pendingResult.IsCompleted = true; 
       if (null != callback) 
       { 
        callback(pendingResult); 
       } 
       else 
       { 
        ((EventWaitHandle)pendingResult.AsyncWaitHandle).Set(); 
       } 
      } 
      t.Dispose(); 
     }, new object(), timeout, Timeout.Infinite); 

     return pendingResult; 
    } 

    public T EndDequeue(IAsyncResult result) 
    { 
     var dequeueResult = (DequeueAsyncResult) result; 
     return dequeueResult.Result; 
    } 
} 

io non sono troppo sicuro circa la sincronizzazione della proprietà IsComplete, e io non sono troppo caldo su come il dequeueQueue unica viene pulita su successive Enqueue chiamate. Non sono sicuro di quando è il momento giusto per segnalare le maniglie di attesa, ma questa è la soluzione migliore che ho ottenuto finora.

Si prega di non considerare questo codice di qualità della produzione con qualsiasi mezzo.Volevo solo mostrare l'essenza generale di come sono riuscito a far girare tutti i thread senza aspettare le serrature. Sono sicuro che questo è pieno di tutti i tipi di casi limite e bug, ma soddisfa i requisiti e volevo restituire qualcosa alle persone che si imbattono nella domanda.

+0

Non capisco completamente il tuo modello di threading. È possibile accedere in EndDequeue al risultato, indipendentemente dal fatto che la richiamata sia stata completata o scaduta.Se si esegue il looping della proprietà IsCompleted finché non si ottiene una risposta, si sta ancora bloccando un thread. –

+0

Non bloccherà dal momento dell'esecuzione su un timer, che non avvierà un thread ma eseguirà solo le operazioni di coda sul pool. Enddequeue viene chiamato anche nei timeout, il risultato sarebbe di default in quei casi. – Dervall

+0

Intendo dire chi sta chiamando EndDequeue? Il pattern Begin/End assicura che è possibile generare il lavoro ma a un certo punto è necessario chiamare il metodo End corrispondente. Ciò che si fa attualmente è registrare i callback e chiamarli durante il metodo Enqueue e attendere un po 'tramite il timer se nel frattempo alcuni dati arrivano. Puoi farlo senza timer semplicemente disponendo di una coda di osservazione in cui tu, su Enqueue, memorizzi il loro tempo di attesa. Quando arrivano i dati, rimuovi tutti gli osservatori troppo vecchi e li salta e richiama la richiamata che non è ancora scaduta. –

0

Posso essere frainteso la situazione, ma non è possibile utilizzare una raccolta non bloccante?

Ho creato questo esempio per illustrare:

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace AsyncTakeFromBlockingCollection 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var queue = new ConcurrentQueue<string>(); 

      var producer1 = Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 10; i += 1) 
       { 
        queue.Enqueue("======="); 
        Thread.Sleep(10); 
       } 
      }); 

      var producer2 = Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 10; i += 1) 
       { 
        queue.Enqueue("*******"); 
        Thread.Sleep(3); 
       } 
      }); 

      CreateConsumerTask("One ", 3, queue); 
      CreateConsumerTask("Two ", 4, queue); 
      CreateConsumerTask("Three", 7, queue); 

      producer1.Wait(); 
      producer2.Wait(); 
      Console.WriteLine(" Producers Finished"); 
      Console.ReadLine(); 
     } 

     static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue) 
     { 
      Task.Factory.StartNew(() => 
      { 
       while (true) 
       { 
        string result; 
        if (queue.TryDequeue(out result)) 
        { 
         Console.WriteLine(" {0} consumed {1}", taskName, result); 
        } 
        Thread.Sleep(sleepTime); 
       } 
      }); 
     } 
    } 
} 

Ecco l'output del programma

enter image description here

Credo che il BlockingCollection è destinato ad avvolgere una collezione concorrente e fornire un meccanismo per consentire a più i consumatori devono bloccare; in attesa di produttori. Questo utilizzo sembra essere contrario alle tue esigenze.

Ho trovato questo article about the BlockingCollection class per essere utile.

+0

Purtroppo non posso davvero farlo. Ci sono molte code separate che vengono riempite molto scarsamente con il completamento dell'IO. Ce ne possono essere potenzialmente migliaia. Questi vengono quindi utilizzati solo quando si verifica un altro evento IO, che viene eseguito in una richiamata di completamento dell'IO e quindi non può essere bloccato. In sostanza ci sono produttori e consumatori poco frequenti, tutti in esecuzione sul completamento dell'IO. Il consumatore deve sapere se la raccolta ha avuto elementi aggiunti entro un determinato timeout senza mettersi in una chiamata bloccante. – Dervall

+0

Come vengono mappati i consumatori nelle code? Un consumatore per ogni coda? I consumatori iterano attraverso le code alla ricerca di prodotti? – rtev

+0

I consumatori arrivano in modo asincrono su richiami IO e sono tenuti a estrarre le cose dalla raccolta o attendere un determinato periodo di tempo prima di rinunciare se la raccolta non è piena di dati durante tale periodo. Nessuna iterazione, è praticamente una coda – Dervall

0

Sono abbastanza sicuro che lo BlockingCollection<T> non è in grado di farlo, dovresti eseguire il rollover. Sono arrivato fino a questo:

class NotifyingCollection<T> 
{ 
    private ConcurrentQueue<Action<T>> _subscribers = new ConcurrentQueue<Action<T>>(); 
    private ConcurrentQueue<T> _overflow = new ConcurrentQueue<T>(); 

    private object _lock = new object(); 

    public void Add(T item) 
    { 
     _overflow.Enqueue(item); 
     Dispatch(); 
    } 

    private void Dispatch() 
    { 
     // this lock is needed since we need to atomically dequeue from both queues... 
     lock (_lock) 
     { 
      while (_overflow.Count > 0 && _subscribers.Count > 0) 
      { 
       Action<T> callback; 
       T item; 

       var r1 = _overflow.TryDequeue(out item); 
       var r2 = _subscribers.TryDequeue(out callback); 

       Debug.Assert(r1 && r2); 
       callback(item); 
       // or, optionally so that the caller thread's doesn't take too long ... 
       Task.Factory.StartNew(() => callback(item)); 
       // but you'll have to consider how exceptions will be handled. 
      } 
     } 
    } 

    public void TakeAsync(Action<T> callback) 
    { 
     _subscribers.Enqueue(callback); 
     Dispatch(); 
    } 
} 

Ho usato il thread che chiama TakeAsync() o Add() per servire come il filo di richiamata. Quando si chiama Add() o TakeAsync(), tenterà di inviare tutti gli elementi in coda ai callback in coda. In questo modo non è stato creato alcun thread che siediti lì a dormire, in attesa di essere segnalato.

Questo blocco è un po 'brutto, ma sarete in grado di accodare e sottoscrivere su più thread senza bloccare. Non riesco a capire un modo per fare l'equivalente di ne deseleziono solo uno se c'è qualcosa disponibile nell'altra coda senza usare quel blocco.

Nota: l'ho provato solo in minima parte, con alcuni fili.

+0

Beh, sì. L'unico problema è che creerai un thread bloccato usando il tuo approccio che sarebbe pericoloso considerando un carico molto elevato. I blocchi di chiamata 'TryTake'. In una situazione in cui si hanno migliaia di queste chiamate, si esauriscono i thread nel pool di thread di attività e si blocca l'applicazione. – Dervall

+0

Ah! non volevi assolutamente bloccare i fili; Pensavo volessi solo assicurarmi che il thread chiamante non venisse bloccato. Lo capisco adesso. – atanamir

+0

OK, ho modificato la mia risposta con una che penso farà quello che stai cercando ... – atanamir

Problemi correlati