2012-06-13 2 views
28

utilizzando Reactive Extensions, voglio ignorare i messaggi provenienti dal mio flusso di eventi che si verificano mentre il mio metodo Subscribe è in esecuzione. Cioè a volte mi ci vuole più tempo per elaborare un messaggio rispetto al tempo tra un messaggio e l'altro, quindi voglio lasciare i messaggi che non ho il tempo di elaborare.Con Rx, come faccio a ignorare tutto tranne il valore più recente quando il mio metodo di iscrizione è in esecuzione

Tuttavia, quando il mio metodo Subscribe viene completato, se si sono verificati messaggi, desidero elaborare l'ultimo. Quindi elaboro sempre il messaggio più recente.

Quindi, se ho un po 'di codice che fa:

messages.OnNext(100); 
messages.OnNext(1); 
messages.OnNext(2); 

e se assumiamo il '100' richiede molto tempo per elaborare. Quindi voglio che il '2' venga elaborato quando il '100' è completato. Il '1' dovrebbe essere ignorato perché è stato sostituito dal '2' mentre il '100' era ancora in elaborazione.

Ecco un esempio del risultato che voglio utilizzare un task in background e Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100)); 

Task.Factory.StartNew(() => 
{ 
    foreach(var n in messages.Latest()) 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(250)); 
     Console.WriteLine(n); 
    } 
}); 

Tuttavia, recenti() è una chiamata di blocco e preferirei non avere un filo seduta in attesa che il valore successivo come questo (a volte ci saranno intervalli molto lunghi tra i messaggi).

posso anche ottenere il risultato che voglio utilizzando un BroadcastBlock da TPL Dataflow, come questo:

var buffer = new BroadcastBlock<long>(n => n); 
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n)); 

buffer.AsObservable() 
    .Subscribe(n => 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(250)); 
     Console.WriteLine(n); 
    }); 

ma questo si sente come dovrebbe essere possibile direttamente in Rx. Qual è il modo migliore per farlo?

+0

Suona come un lavoro per Window (), anche se qualcuno potrebbe trovare una soluzione più semplice. –

+0

I tuoi eventi devono essere generati indipendentemente dall'abbonamento. –

risposta

3

Grazie a Lee Campbell (di Intro To Rx fama), ora ho una soluzione di lavoro di utilizzare questo metodo di estensione:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler) 
{ 
    return Observable.Create<T>(observer => 
    { 
     Notification<T> outsideNotification = null; 
     var gate = new object(); 
     bool active = false; 
     var cancelable = new MultipleAssignmentDisposable(); 
     var disposable = source.Materialize().Subscribe(thisNotification => 
     { 
      bool alreadyActive; 
      lock (gate) 
      { 
       alreadyActive = active; 
       active = true; 
       outsideNotification = thisNotification; 
      } 

      if (!alreadyActive) 
      { 
       cancelable.Disposable = scheduler.Schedule(self => 
       { 
        Notification<T> localNotification = null; 
        lock (gate) 
        { 
         localNotification = outsideNotification; 
         outsideNotification = null; 
        } 
        localNotification.Accept(observer); 
        bool hasPendingNotification = false; 
        lock (gate) 
        { 
         hasPendingNotification = active = (outsideNotification != null); 
        } 
        if (hasPendingNotification) 
        { 
         self(); 
        } 
       }); 
      } 
     }); 
     return new CompositeDisposable(disposable, cancelable); 
    }); 
} 
+0

Qual è lo scopo di "Materializzare" e usare le "Notifiche" rispetto alla semplice memorizzazione del valore stesso? Dai miei test sembra che funzioni come previsto per tenere traccia del solo valore - ma forse mi mancano alcuni aspetti fondamentali. –

+2

@AndrewHanlon utilizzando la notifica anziché solo il valore è per gestire le eccezioni, altrimenti non verranno trasmessi correttamente sul canale OnError. – Wilka

+0

Ah, questo ha senso! Grazie. –

3

Ecco un tentativo utilizzando "solo" Rx. Il timer e l'abbonato sono mantenuti indipendenti osservando sul threadpool e ho usato un oggetto per fornire un feedback sul completamento dell'attività.

Non penso che sia una soluzione semplice, ma spero che possa darti idee per migliorare.

messages. 
    Buffer(() => feedback). 
    Select(l => l.LastOrDefault()). 
    ObserveOn(Scheduler.ThreadPool). 
    Subscribe(n => 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(250)); 
     Console.WriteLine(n); 
     feedback.OnNext(Unit.Default); 
    }); 

feedback.OnNext(Unit.Default); 

C'è un piccolo problema: il buffer viene prima chiuso quando è vuoto, quindi genera il valore predefinito. Probabilmente potresti risolverlo facendo il feedback dopo il primo messaggio.


Qui è come una funzione di estensione:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action) 
{ 
    var feedback = new Subject<Unit>(); 

    var sub = source. 
     Buffer(() => feedback). 
     ObserveOn(Scheduler.ThreadPool). 
     Subscribe(l => 
     { 
      action(l.LastOrDefault()); 
      feedback.OnNext(Unit.Default); 
     }); 

    feedback.OnNext(Unit.Default); 

    return sub; 
} 

e di utilizzo:

messages.SubscribeWithoutOverlap(n => 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine(n); 
    }); 
+0

Non vuoi 'LastOrDefault' invece di' FirstOrDefault'? – yamen

+0

@yamen Probabilmente è ragionevole –

8

Qui è un metodo che è simile a Dave, ma utilizza Sample invece (che è più appropriato di buffer). Ho incluso un metodo di estensione simile a quello che ho aggiunto alla risposta di Dave.

L'estensione:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action) 
{ 
    var sampler = new Subject<Unit>(); 

    var sub = source. 
     Sample(sampler). 
     ObserveOn(Scheduler.ThreadPool). 
     Subscribe(l => 
     { 
      action(l); 
      sampler.OnNext(Unit.Default); 
     }); 

    // start sampling when we have a first value 
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default)); 

    return sub; 
} 

Si noti che è più semplice, e non c'è alcun buffer di 'vuoto' che è licenziato. Il primo elemento che viene inviato all'azione proviene effettivamente dallo stream stesso.

L'uso è semplice:

messages.SubscribeWithoutOverlap(n => 
{ 
    Console.WriteLine("start: " + n); 
    Thread.Sleep(500); 
    Console.WriteLine("end: " + n); 
}); 

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing 

E i risultati:

source: 0 
start: 0 
source: 1 
source: 2 
source: 3 
source: 4 
source: 5 
end: 0 
start: 5 
source: 6 
source: 7 
source: 8 
source: 9 
source: 10 
end: 5 
start: 10 
source: 11 
source: 12 
source: 13 
source: 14 
source: 15 
end: 10 
+3

Questo ha un problema nel fatto che se il sorgente non ha inserito nulla nel buffer di campionamento nel punto in cui viene chiamato sampler.OnNext, il sistema entra in uno stato in cui non genererà più valori. Ho fatto una variazione su questo utilizzando Switch invece di sample http://stackoverflow.com/a/15876519/158285 – bradgonesurfing

+0

non dovrebbe restituire * IDisposable * restituito * allo stesso scopo * Subject *? – superjos

1

Ecco un'implementazione Task base, con la semantica di cancellazione, che non utilizzano un soggetto. Chiamando dispose consente all'azione sottoscritta di annullare l'elaborazione, se lo si desidera.

public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action) 
    { 
     var cancellation = new CancellationDisposable(); 
     var token = cancellation.Token; 
     Task task = null; 

     return new CompositeDisposable(
      cancellation, 
      observable.Subscribe(value => 
      { 
       if (task == null || task.IsCompleted) 
        task = Task.Factory.StartNew(() => action(value, token), token); 
      }) 
     ); 
    } 

Ecco un semplice test:

Observable.Interval(TimeSpan.FromMilliseconds(150)) 
         .SampleSubscribe((v, ct) => 
         { 
          //cbeck for cancellation, do work 
          for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++) 
           Thread.Sleep(100); 

          Console.WriteLine(v); 
         }); 

L'output:

0 
7 
14 
21 
28 
35 
1

Con Rx 2.0 RC è possibile utilizzare Chunkify per ottenere un IEnumerable di liste, ciascuna contenente quanto osservato dal l'ultimo MoveNext.

È quindi possibile utilizzare ToObservable per riconvertirlo in IObservable e prestare attenzione solo all'ultima voce in ogni elenco non vuoto.

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100)); 

messages.Chunkify() 
     .ToObservable(Scheduler.TaskPool) 
     .Where(list => list.Any()) 
     .Select(list => list.Last()) 
     .Subscribe(n => 
     { 
      Thread.Sleep(TimeSpan.FromMilliseconds(250)); 
      Console.WriteLine(n); 
     }); 
+2

Funziona, ma lascia un filo girare per estrarre le cose dall'osservabile (così una delle mie CPU viene espulsa) – Wilka

+0

E sta costruendo una lista piena di valori che potresti ignorare. L'estensione ObserveLatestOn evita questo: nessun elenco, nessuna allocazione dalla crescita dell'elenco, nessun riferimento che mantiene vivi i vecchi messaggi di notifica. –

2

Un esempio che utilizza Observable.Switch. Gestisce anche il caso quando si completa l'attività, ma non c'è nulla di nella coda.

using System.Reactive.Linq; 
using System.Reactive.Subjects; 
using System.Reactive.Concurrency; 
using System.Reactive.Disposables; 

namespace System.Reactive 
{ 
    public static class RXX 
    { 
     public static IDisposable SubscribeWithoutOverlap<T> 
     (this IObservable<T> source 
     , Action<T> action 
     , IScheduler scheduler = null) 
     { 
      var sampler = new Subject<Unit>(); 
      scheduler = scheduler ?? Scheduler.Default; 
      var p = source.Publish(); 
      var connection = p.Connect(); 

      var subscription = sampler.Select(x=>p.Take(1)) 
       .Switch() 
       .ObserveOn(scheduler) 
       .Subscribe(l => 
       { 
        action(l); 
        sampler.OnNext(Unit.Default); 
       }); 

      sampler.OnNext(Unit.Default); 

      return new CompositeDisposable(connection, subscription); 
     } 
    } 
} 
+0

Ho appena notato che questo può perdere i valori. Cioè non sempre elabora il valore più recente atterra nella coda quando sta già facendo qualcosa. per esempio. https://gist.github.com/WilkaH/5403360 stampa solo "Fatto 100", non "Fatto 2" in seguito (il 1 deve essere eliminato perché sostituito) – Wilka

+0

Dovrebbe ignorare gli elementi che atterrano nella coda quando è attualmente in lavorazione. Non sono sicuro di cosa intendi. – bradgonesurfing

+0

Nel qual caso non ho chiarito nella mia domanda iniziale. Voglio sempre che il nuovo articolo venga elaborato, quindi se arriva mentre qualcos'altro sta elaborando, allora quell'elemento dovrebbe essere elaborato al termine di quello corrente (invece di essere perso). – Wilka

3

Ho scritto un post su questo blog con una soluzione che utilizza CAS invece di serrature ed evita la ricorsione. Il codice è al di sotto, ma è possibile trovare una spiegazione completa qui: http://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>(
    this IObservable<TSource> source, 
    IScheduler scheduler) 
{ 
    return Observable.Create<TSource>(observer => 
    { 
     Notification<TSource> pendingNotification = null; 
     var cancelable = new MultipleAssignmentDisposable(); 

     var sourceSubscription = source.Materialize() 
      .Subscribe(notification => 
      { 
       var previousNotification = Interlocked.Exchange(
        ref pendingNotification, notification); 

       if (previousNotification != null) return; 

       cancelable.Disposable = scheduler.Schedule(() => 
        { 
         var notificationToSend = Interlocked.Exchange(
          ref pendingNotification, null); 
         notificationToSend.Accept(observer); 
        }); 
      }); 
      return new CompositeDisposable(sourceSubscription, cancelable); 
    }); 
} 
2

appena finito (e già completamente rivisto) la mia soluzione al problema, che ho intenzione di usare in produzione.

A meno che lo scheduler utilizza il thread corrente, invita a OnNext, OnCompleted, OnError dalla sorgente dovrebbe restituire immediatamente; se l'osservatore è occupato con le notifiche precedenti, entra in una coda con una dimensione massima specificabile, da cui verrà notificato ogni volta che la notifica precedente è stata elaborata. Se la coda si riempie, gli elementi meno recenti vengono scartati. Quindi, una dimensione massima della coda di 0 ignora tutti gli elementi che arrivano mentre l'osservatore è occupato; una dimensione di 1 permetterà sempre di osservare l'ultimo oggetto; una dimensione fino a int.MaxValue tiene occupato il consumatore fino a quando non raggiunge il produttore.

Se lo scheduler supporta un funzionamento prolungato (ovvero fornisce un thread proprio), pianifico un ciclo per notificare l'osservatore; altrimenti uso la schedulazione ricorsiva.

Ecco il codice. Qualsiasi commento è apprezzato.

partial class MoreObservables 
{ 
    /// <summary> 
    /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process. 
    /// </summary> 
    /// <param name="source">The source sequence.</param> 
    /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param> 
    /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param> 
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> 
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception> 
    /// <remarks> 
    /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready. 
    /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any. 
    /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>. 
    /// </remarks> 
    public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null) 
    { 
     if (source == null) throw new ArgumentNullException(nameof(source)); 
     if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize)); 
     if (scheduler == null) scheduler = Scheduler.Default; 

     return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer)); 
    } 

    private static class LatestImpl<TSource> 
    { 
     public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer) 
     { 
      if (observer == null) throw new ArgumentNullException(nameof(observer)); 

      var longrunningScheduler = scheduler.AsLongRunning(); 
      if (longrunningScheduler != null) 
       return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer); 

      return new RecursiveSubscription(source, maxQueueSize, scheduler, observer); 
     } 

     #region Subscriptions 

     /// <summary> 
     /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop. 
     /// </summary> 
     private sealed class LoopSubscription : IDisposable 
     { 
      private enum State 
      { 
       Idle, // nothing to notify 
       Head, // next notification is in _head 
       Queue, // next notifications are in _queue, followed by _completion 
       Disposed, // disposed 
      } 

      private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable(); 
      private readonly IObserver<TSource> _observer; 
      private State _state; 
      private TSource _head; // item in front of the queue 
      private IQueue _queue; // queued items 
      private Notification<TSource> _completion; // completion notification 

      public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer) 
      { 
       _observer = observer; 
       _queue = Queue.Create(maxQueueSize); 
       scheduler.ScheduleLongRunning(_ => Loop()); 
       _subscription.Disposable = source.Subscribe(
        OnNext, 
        error => OnCompletion(Notification.CreateOnError<TSource>(error)), 
        () => OnCompletion(Notification.CreateOnCompleted<TSource>())); 
      } 

      private void OnNext(TSource value) 
      { 
       lock (_subscription) 
       { 
        switch (_state) 
        { 
         case State.Idle: 
          _head = value; 
          _state = State.Head; 
          Monitor.Pulse(_subscription); 
          break; 
         case State.Head: 
         case State.Queue: 
          if (_completion != null) return; 
          try { _queue.Enqueue(value); } 
          catch (Exception error) // probably OutOfMemoryException 
          { 
           _completion = Notification.CreateOnError<TSource>(error); 
           _subscription.Dispose(); 
          } 
          break; 
        } 
       } 
      } 

      private void OnCompletion(Notification<TSource> completion) 
      { 
       lock (_subscription) 
       { 
        switch (_state) 
        { 
         case State.Idle: 
          _completion = completion; 
          _state = State.Queue; 
          Monitor.Pulse(_subscription); 
          _subscription.Dispose(); 
          break; 
         case State.Head: 
         case State.Queue: 
          if (_completion != null) return; 
          _completion = completion; 
          _subscription.Dispose(); 
          break; 
        } 
       } 
      } 

      public void Dispose() 
      { 
       lock (_subscription) 
       { 
        if (_state == State.Disposed) return; 

        _head = default(TSource); 
        _queue = null; 
        _completion = null; 
        _state = State.Disposed; 
        Monitor.Pulse(_subscription); 
        _subscription.Dispose(); 
       } 
      } 

      private void Loop() 
      { 
       try 
       { 
        while (true) // overall loop for all notifications 
        { 
         // next notification to emit 
         Notification<TSource> completion; 
         TSource next; // iff completion == null 

         lock (_subscription) 
         { 
          while (true) 
          { 
           while (_state == State.Idle) 
            Monitor.Wait(_subscription); 

           if (_state == State.Head) 
           { 
            completion = null; 
            next = _head; 
            _head = default(TSource); 
            _state = State.Queue; 
            break; 
           } 
           if (_state == State.Queue) 
           { 
            if (!_queue.IsEmpty) 
            { 
             completion = null; 
             next = _queue.Dequeue(); // assumption: this never throws 
             break; 
            } 
            if (_completion != null) 
            { 
             completion = _completion; 
             next = default(TSource); 
             break; 
            } 
            _state = State.Idle; 
            continue; 
           } 
           Debug.Assert(_state == State.Disposed); 
           return; 
          } 
         } 

         if (completion != null) 
         { 
          completion.Accept(_observer); 
          return; 
         } 
         _observer.OnNext(next); 
        } 
       } 
       finally { Dispose(); } 
      } 
     } 

     /// <summary> 
     /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively. 
     /// </summary> 
     private sealed class RecursiveSubscription : IDisposable 
     { 
      private enum State 
      { 
       Idle, // nothing to notify 
       Scheduled, // emitter scheduled or executing 
       Disposed, // disposed 
      } 

      private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable(); 
      private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action 
      private readonly IScheduler _scheduler; 
      private readonly IObserver<TSource> _observer; 
      private State _state; 
      private IQueue _queue; // queued items 
      private Notification<TSource> _completion; // completion notification 

      public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer) 
      { 
       _scheduler = scheduler; 
       _observer = observer; 
       _queue = Queue.Create(maxQueueSize); 
       _subscription.Disposable = source.Subscribe(
        OnNext, 
        error => OnCompletion(Notification.CreateOnError<TSource>(error)), 
        () => OnCompletion(Notification.CreateOnCompleted<TSource>())); 
      } 

      private void OnNext(TSource value) 
      { 
       lock (_subscription) 
       { 
        switch (_state) 
        { 
         case State.Idle: 
          _emitter.Disposable = _scheduler.Schedule(value, EmitNext); 
          _state = State.Scheduled; 
          break; 
         case State.Scheduled: 
          if (_completion != null) return; 
          try { _queue.Enqueue(value); } 
          catch (Exception error) // probably OutOfMemoryException 
          { 
           _completion = Notification.CreateOnError<TSource>(error); 
           _subscription.Dispose(); 
          } 
          break; 
        } 
       } 
      } 

      private void OnCompletion(Notification<TSource> completion) 
      { 
       lock (_subscription) 
       { 
        switch (_state) 
        { 
         case State.Idle: 
          _completion = completion; 
          _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion)); 
          _state = State.Scheduled; 
          _subscription.Dispose(); 
          break; 
         case State.Scheduled: 
          if (_completion != null) return; 
          _completion = completion; 
          _subscription.Dispose(); 
          break; 
        } 
       } 
      } 

      public void Dispose() 
      { 
       lock (_subscription) 
       { 
        if (_state == State.Disposed) return; 

        _emitter.Dispose(); 
        _queue = null; 
        _completion = null; 
        _state = State.Disposed; 
        _subscription.Dispose(); 
       } 
      } 

      private void EmitNext(TSource value, Action<TSource> self) 
      { 
       try { _observer.OnNext(value); } 
       catch { Dispose(); return; } 

       lock (_subscription) 
       { 
        if (_state == State.Disposed) return; 
        Debug.Assert(_state == State.Scheduled); 
        if (!_queue.IsEmpty) 
         self(_queue.Dequeue()); 
        else if (_completion != null) 
         _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion)); 
        else 
         _state = State.Idle; 
       } 
      } 

      private void EmitCompletion(Notification<TSource> completion) 
      { 
       try { completion.Accept(_observer); } 
       finally { Dispose(); } 
      } 
     } 

     #endregion 

     #region IQueue 

     /// <summary> 
     /// FIFO queue that discards least recent items if size limit is reached. 
     /// </summary> 
     private interface IQueue 
     { 
      bool IsEmpty { get; } 
      void Enqueue(TSource item); 
      TSource Dequeue(); 
     } 

     /// <summary> 
     /// <see cref="IQueue"/> implementations. 
     /// </summary> 
     private static class Queue 
     { 
      public static IQueue Create(int maxSize) 
      { 
       switch (maxSize) 
       { 
        case 0: return Zero.Instance; 
        case 1: return new One(); 
        default: return new Many(maxSize); 
       } 
      } 

      private sealed class Zero : IQueue 
      { 
       // ReSharper disable once StaticMemberInGenericType 
       public static Zero Instance { get; } = new Zero(); 
       private Zero() { } 

       public bool IsEmpty => true; 
       public void Enqueue(TSource item) { } 
       public TSource Dequeue() { throw new InvalidOperationException(); } 
      } 

      private sealed class One : IQueue 
      { 
       private TSource _item; 

       public bool IsEmpty { get; private set; } = true; 

       public void Enqueue(TSource item) 
       { 
        _item = item; 
        IsEmpty = false; 
       } 

       public TSource Dequeue() 
       { 
        if (IsEmpty) throw new InvalidOperationException(); 

        var item = _item; 
        _item = default(TSource); 
        IsEmpty = true; 
        return item; 
       } 
      } 

      private sealed class Many : IQueue 
      { 
       private readonly int _maxSize, _initialSize; 
       private int _deq, _enq; // indices of deque and enqueu positions 
       private TSource[] _buffer; 

       public Many(int maxSize) 
       { 
        if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize)); 

        _maxSize = maxSize; 
        if (maxSize == int.MaxValue) 
         _initialSize = 4; 
        else 
        { 
         // choose an initial size that won't get us too close to maxSize when doubling 
         _initialSize = maxSize; 
         while (_initialSize >= 7) 
          _initialSize = (_initialSize + 1)/2; 
        } 
       } 

       public bool IsEmpty { get; private set; } = true; 

       public void Enqueue(TSource item) 
       { 
        if (IsEmpty) 
        { 
         if (_buffer == null) _buffer = new TSource[_initialSize]; 
         _buffer[0] = item; 
         _deq = 0; 
         _enq = 1; 
         IsEmpty = false; 
         return; 
        } 
        if (_deq == _enq) // full 
        { 
         if (_buffer.Length == _maxSize) // overwrite least recent 
         { 
          _buffer[_enq] = item; 
          if (++_enq == _buffer.Length) _enq = 0; 
          _deq = _enq; 
          return; 
         } 

         // increse buffer size 
         var newSize = _buffer.Length >= _maxSize/2 ? _maxSize : 2 * _buffer.Length; 
         var newBuffer = new TSource[newSize]; 
         var count = _buffer.Length - _deq; 
         Array.Copy(_buffer, _deq, newBuffer, 0, count); 
         Array.Copy(_buffer, 0, newBuffer, count, _deq); 
         _deq = 0; 
         _enq = _buffer.Length; 
         _buffer = newBuffer; 
        } 
        _buffer[_enq] = item; 
        if (++_enq == _buffer.Length) _enq = 0; 
       } 

       public TSource Dequeue() 
       { 
        if (IsEmpty) throw new InvalidOperationException(); 

        var result = ReadAndClear(ref _buffer[_deq]); 
        if (++_deq == _buffer.Length) _deq = 0; 
        if (_deq == _enq) 
        { 
         IsEmpty = true; 
         if (_buffer.Length > _initialSize) _buffer = null; 
        } 
        return result; 
       } 

       private static TSource ReadAndClear(ref TSource item) 
       { 
        var result = item; 
        item = default(TSource); 
        return result; 
       } 
      } 
     } 

     #endregion 
    } 
} 
+0

Wow, ci sono molte cose ben documentate. Anche se sto solo rivedendo e non sono interessato all'argomento, penso di doverti ringraziare per aver portato un bel lavoro qui. – YakovL

+0

Prego. Anche se non l'ho fatto per puro altruismo, spero che qualcuno lo troverà utile (e mi aiuta a ottenere la reputazione 50 quindi potrò almeno commentare sui post) – tinudu

Problemi correlati