2013-02-11 12 views
10

Sto provando Rx perché sembra un buon adattamento per il nostro dominio ma la curva di apprendimento mi ha colto di sorpresa.Unione di dati storici e live con Rx

Ho bisogno di unire i dati storici dei prezzi con i dati dei prezzi in tempo reale.

Sto cercando di adattare l'approccio usuale per fare questo nella lingua di Rx:

  1. Iscriviti ai prezzi in tempo reale immediatamente e iniziare il buffering dei valori torno
  2. avviare una richiesta di storica dati dei prezzi (questo deve succedere dopo l'abbonamento ai prezzi in tempo reale quindi non abbiamo lacune nei nostri dati)
  3. Pubblica i prezzi storici al loro ritorno
  4. Una volta ricevuti tutti i dati storici, pubblica il buffer dati in tempo reale, rimuovendo tutti i valori che si sovrappongono con i nostri dati storici all'inizio
  5. Continua la riproduzione dei dati dal feed prezzo dal vivo

Ho questo codice di uomo di paglia disgustoso e non corretta, che sembra funzionare per i casi di test ingenui che ho scritto:

IConnectableObservable<Tick> live = liveService 
    .For(symbol) 
    .Replay(/* Some appropriate buffer size */); 
live.Connect(); 

IObservable<Tick> historical = historyService.For(since, symbol); 

return new[] {historical, live} 
    .Concat() 
    .Where(TicksAreInChronologicalOrder()); 

private static Func1<Tick,bool> TicksAreInChronologicalOrder() 
{ 
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw 
} 

Questo ha alcuni inconvenienti

  1. La dimensione buffer di riproduzione appropriato non è noto. L'impostazione di un buffer illimitato non è possibile, questa è una sequenza di lunga durata. Vogliamo davvero una sorta di buffer una tantum che svuota durante la prima chiamata a Iscriviti. Se questo esiste in Rx, non riesco a trovarlo.
  2. Il buffer di riproduzione continuerà ad esistere anche dopo aver passato alla pubblicazione dei prezzi in tempo reale. Non abbiamo bisogno del buffer a questo punto.
  3. Analogamente, il predicato per filtrare i tick sovrapposti non è necessario una volta saltata la sovrapposizione iniziale tra i prezzi storici e quelli live. Voglio davvero fare qualcosa del tipo: live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */). Wait(this IObservable<TSource>) è utile qui?

Ci deve essere un modo migliore per farlo, ma sto ancora aspettando che il mio cervello suoni Rx come fa FP.

Un'altra opzione che ho considerato di risolvere 1. è scrivere la mia estensione Rx che sarebbe un ISubject che accoda i messaggi fino a quando non ottiene il primo sottoscrittore (e rifiuta gli abbonati dopo quello?). Forse è la strada da percorrere?

+0

Would 'Switch()' lavoro qui? Come in: 'historical.Switch (live)' – AlexFoxGill

risposta

1

Per la cronaca, ecco quello che ho fatto alla fine. Sono ancora molto uno studente Rx, e di ritorno a .Net l'ho visto l'ultima volta alla versione 2.0. Tutto il feedback è molto ben accolto.

L'oggetto Ticks utilizzato di seguito può contenere uno o più valori di tick. Il servizio di dati storici restituisce i dati in diverse zecche.

public class HistoricalAndLivePriceFeed : IPriceFeed 
{ 
    private readonly IPriceFeed history; 
    private readonly IPriceFeed live; 
    private readonly IClock clock; 

    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live) 
:   this(history, live, new RealClock()) 
     { 
    } 
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock) 
    { 
     this.history = history; 
     this.live = live; 
     this.clock = clock; 
    } 

    public IObservable<Ticks> For(DateTime since, ISymbol symbol) 
    { 
     return Observable.Create<Ticks>(observer => 
     { 
      var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol)); 

      var definitelyInHistoricalTicks = clock.Now; 
      // Sleep to make sure that historical data overlaps our live data 
      // If we ever use a data provider with less fresh historical data, we may need to rethink this 
      clock.Wait(TimeSpan.FromSeconds(1)); 

      var liveStreamAfterEndOfHistoricalTicks = liveStream 
       .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks) 
       .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1)); 

      var subscription = history.For(since, symbol) 
       .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1)) 
       .Concat(liveStreamAfterEndOfHistoricalTicks) 
       .Subscribe(observer); 

      return liveStream.And(subscription); 
     }); 
    } 
} 
public static class CompositeDisposableExtensions 
{ 
    public static CompositeDisposable And(this IDisposable disposable, Action action) 
    { 
     return And(disposable, Disposable.Create(action)); 
    } 

    public static CompositeDisposable And(this IDisposable disposable, IDisposable other) 
    { 
     return new CompositeDisposable(disposable, other); 
    } 
} 

che utilizza questo codice Rx, che io ancora non abbastanza fiducia:

using System; 
using System.Collections.Generic; 
using System.Reactive.Disposables; 
using System.Reactive.Subjects; 

namespace My.Rx 
{ 
    /// <summary> 
    /// Buffers values from an underlying observable when no observers are subscribed. 
    /// 
    /// On Subscription, any buffered values will be replayed. 
    /// 
    /// Only supports one observer for now. 
    /// 
    /// Buffer is an ISubject for convenience of implementation but IObserver methods 
    /// are hidden. It is not intended that Buffer should be used as an IObserver, 
    /// except through StartBuffering() and it is dangerous to do so because none of 
    /// the IObserver methods check whether Buffer has been disposed. 
    /// </summary> 
    /// <typeparam name="TSource"></typeparam> 
    public class Buffer<TSource> : ISubject<TSource>, IDisposable 
    { 
     private readonly object gate = new object(); 
     private readonly Queue<TSource> queue = new Queue<TSource>(); 

     private bool isDisposed; 
     private Exception error; 
     private bool stopped; 
     private IObserver<TSource> observer = null; 
     private IDisposable subscription; 

     public static Buffer<TSource> StartBuffering(IObservable<TSource> observable) 
     { 
      return new Buffer<TSource>(observable); 
     } 

     private Buffer(IObservable<TSource> observable) 
     { 
      subscription = observable.Subscribe(this); 
     } 

     void IObserver<TSource>.OnNext(TSource value) 
     { 
      lock (gate) 
      { 
       if (stopped) return; 
       if (IsBuffering) 
        queue.Enqueue(value); 
       else 
        observer.OnNext(value); 
      } 
     } 

     void IObserver<TSource>.OnError(Exception error) 
     { 
      lock (gate) 
      { 
       if (stopped) return; 
       if (IsBuffering) 
        this.error = error; 
       else 
        observer.OnError(error); 
       stopped = true; 
      } 
     } 

     void IObserver<TSource>.OnCompleted() 
     { 
      lock (gate) 
      { 
       stopped = true; 
      } 
     } 

     public IDisposable Subscribe(IObserver<TSource> observer) 
     { 
      lock (gate) 
      { 
       if (isDisposed) 
        throw new ObjectDisposedException(string.Empty); 

       if (this.observer != null) 
        throw new NotImplementedException("A Buffer can currently only support one observer at a time"); 

       while(!queue.IsEmpty()) 
       { 
        observer.OnNext(queue.Dequeue()); 
       } 

       if (error != null) 
        observer.OnError(error); 
       else if (stopped) 
        observer.OnCompleted(); 

       this.observer = observer; 
       return Disposable.Create(() => 
              { 
               lock (gate) 
               { 
                      // Go back to buffering 
                this.observer = null; 
               } 
              }); 
      } 
     } 

     private bool IsBuffering 
     { 
      get { return observer == null; } 
     } 


     public void Dispose() 
     { 
      lock (gate) 
      { 
       subscription.Dispose(); 

       isDisposed = true; 
       subscription = null; 
       observer = null; 
      } 
     } 
    } 
} 

Che passa questi test (non ho preso la briga di sicurezza filo controllo ancora):

private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world"); 

[Test] 
public void ReplaysBufferedValuesToFirstSubscriber() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 
    underlying.OnNext(1); 
    underlying.OnNext(2); 

    var observed = new List<int>(); 

    buffer.Subscribe(Observer.Create<int>(observed.Add)); 

    Assert.That(observed, Is.EquivalentTo(new []{1,2})); 
} 

[Test] 
public void PassesNewValuesToObserver() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    var observed = new List<int>(); 
    buffer.Subscribe(Observer.Create<int>(observed.Add)); 

    underlying.OnNext(1); 
    underlying.OnNext(2); 

    Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 })); 
} 


[Test] 
public void DisposesOfSubscriptions() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    var observed = new List<int>(); 

    buffer.Subscribe(Observer.Create<int>(observed.Add)) 
     .Dispose(); 

    underlying.OnNext(1); 

    Assert.That(observed, Is.Empty); 
} 

[Test] 
public void StartsBufferingAgainWhenSubscriptionIsDisposed() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    // These should be buffered 
    underlying.OnNext(1); 
    underlying.OnNext(2); 

    var firstSubscriptionObserved = new List<int>(); 
    using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add))) 
    { 
     // Should be passed through to first subscription 
     underlying.OnNext(3); 
    } 
    Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 })); 

    // First subscription has been disposed- 
    // we should be back to buffering again 
    underlying.OnNext(4); 
    underlying.OnNext(5); 

    var secondSubscriptionObserved = new List<int>(); 
    using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add))) 
    { 
     // Should be passed through to second subscription 
     underlying.OnNext(6); 
    } 
    Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6})); 
} 

[Test] 
public void DoesNotSupportTwoConcurrentObservers() 
{ 
    // Use .Publish() if you need to do this 

    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    buffer.Subscribe(Observer.Create<int>(i => { })); 

    Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { }))); 
} 

[Test] 
public void CannotBeUsedAfterDisposal() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 
    buffer.Dispose(); 

    Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { }))); 
} 

[Test] 
public void ReplaysBufferedError() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnError(exceptionThrownFromUnderlying); 

    var observed = new List<int>(); 
    Exception foundException = null; 
    buffer.Subscribe(
     observed.Add, 
     e => foundException = e); 

    Assert.That(observed, Is.EquivalentTo(new []{1})); 
    Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying)); 
} 

[Test] 
public void ReplaysBufferedCompletion() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnCompleted(); 

    var observed = new List<int>(); 
    var completed = false; 
    buffer.Subscribe(
     observed.Add, 
     () => completed=true); 

    Assert.That(observed, Is.EquivalentTo(new[] { 1 })); 
    Assert.True(completed); 
} 

[Test] 
public void ReplaysBufferedErrorToSubsequentObservers() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnError(exceptionThrownFromUnderlying); 

    // Drain value queue 
    using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ; 

    var observered = new List<int>(); 
    Exception exceptionEncountered = null; 
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e))); 

    Assert.That(observered, Is.Empty); 
    Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying)); 
} 

[Test] 
public void ReplaysBufferedCompletionToSubsequentObservers() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnCompleted(); 

    // Drain value queue 
    using (buffer.Subscribe(Observer.Create<int>(i => { }))) ; 

    var observered = new List<int>(); 
    var completed = false; 
    using (buffer.Subscribe(Observer.Create<int>(observered.Add,()=>completed=true))); 

    Assert.That(observered, Is.Empty); 
    Assert.True(completed); 
} 



[Test] 
public void DisposingOfBufferDisposesUnderlyingSubscription() 
{ 
    var underlyingSubscriptionWasDisposed = false; 
    var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed= true )); 

    var buffer = Buffer<int>.StartBuffering(underlying); 
    buffer.Dispose(); 

    Assert.True(underlyingSubscriptionWasDisposed); 
} 
0

Se i dati storici e in tempo reale sono entrambi-time o-scheduler-based, cioè il flusso di eventi assomiglia a questo nel corso del tempo:

|----------------------------------------------------> time 
    h h h h h h         historical 
       l l l l l l      live 

È possibile utilizzare un semplice TakeUntil costrutto:

var historicalStream = <fetch historical data>; 
var liveStream = <fetch live data>; 

var mergedWithoutOverlap = 
    // pull from historical 
    historicalStream 
     // until we start overlapping with live 
     .TakeUntil(liveStream) 
     // then continue with live data 
     .Concat(liveStream); 

Se si ottengono tutti i dati storici tutti in una volta, come un IEnumerable<T>, è possibile utilizzare una combinazione di StartWith e la vostra altra logica:

var historicalData = <get IEnumerable of tick data>; 
var liveData = <get IObservable of tick data>; 

var mergedWithOverlap = 
    // the observable is the "long running" feed 
    liveData 
    // But we'll inject the historical data in front of it 
    .StartWith(historicalData) 
    // Perform filtering based on your needs 
    .Where(....); 
1

ne dite qualcosa di simile:

public static IObservable<T> CombineWithHistory<T, TSelectorResult>(this IObservable<T> live, IObservable<T> history, Func<T, TSelectorResult> selector) 
{ 
    var replaySubject = new ReplaySubject<T>(); 
    live.Subscribe(replaySubject); 
    return history.Concat(replaySubject).Distinct(selector); 
} 

Questo utilizza un ID sequenza e distinto per filtrare i duplicati.

E i test corrispondenti:

var testScheduler = new TestScheduler(); 

var history = testScheduler.CreateColdObservable(
    OnNext(1L, new PriceTick { PriceId = 1 }), 
    OnNext(2L, new PriceTick { PriceId = 2 }), 
    OnNext(3L, new PriceTick { PriceId = 3 }), 
    OnNext(4L, new PriceTick { PriceId = 4 }), 
    OnCompleted(new PriceTick(), 5L)); 

var live = testScheduler.CreateHotObservable(
    OnNext(1L, new PriceTick { PriceId = 3 }), 
    OnNext(2L, new PriceTick { PriceId = 4 }), 
    OnNext(3L, new PriceTick { PriceId = 5 }), 
    OnNext(4L, new PriceTick { PriceId = 6 }), 
    OnNext(5L, new PriceTick { PriceId = 7 }), 
    OnNext(6L, new PriceTick { PriceId = 8 }), 
    OnNext(7L, new PriceTick { PriceId = 9 }) 
    ); 


live.Subscribe(pt => Console.WriteLine("Live {0}", pt.PriceId)); 
history.Subscribe(pt => Console.WriteLine("Hist {0}", pt.PriceId),() => Console.WriteLine("C")); 

var combined = live.CombineWithHistory(history, t => t.PriceId); 

combined.Subscribe(pt => Console.WriteLine("Combined {0}", pt.PriceId)); 

testScheduler.AdvanceTo(6L); 

Se si esegue il test, combinato emette prezzo zecche con id 1 a 8.

+0

Grazie a Dave, questo mi ha insegnato alcuni nuovi trucchi con gli scheduler. 1) Poiché i dati tornano in blocchi di uno o più tick anziché di valori singoli, dovremmo selezionare SelectMany prima di chiamare il selettore. Questo non è possibile dati i nostri volumi di dati e requisiti di prestazioni. 2) Per risparmiare memoria, le nostre zecche hanno i timestamp con una precisione di un secondo. Potremmo avere più tick in un solo secondo con lo stesso valore, quindi non è possibile scrivere una funzione selettore semanticamente corretta senza stato. –

+0

Ho effettivamente migliorato questa risposta, perché in realtà l'ho fatto ma ho dimenticato di inserire il codice. Alla fine, ho praticamente messo in coda i tick e poi li ho cancellati al completamento della cronologia. Devi avere un ID di sequenza per assicurarti di non perdere alcun dato. –

+0

Inoltre, il codice che ho dato non soddisfa i tuoi requisiti in quanto il soggetto di riproduzione terrà l'intera cronologia. –

0

Un modo conveniente in termini di memoria e scambi di sovrapposizioni (correttezza).
Aspettando le vostre risposte:

var tradeIds = new HashSet<string>(); 
var replayQuotationTrades = new ReplaySubject<IntradayTrade>(); 
var replaySubscription = _quotationTrades.Subscribe(replayQuotationTrades); 
return _historyTrades 
       .DelaySubscription(TimeSpan.FromMilliseconds(500), _backgroundScheduler) 
       .Do(t => tradeIds.Add(t.TradeId)) 
       .Finally(() => DisposeAndCompleteReplayStream(replaySubscription, replayQuotationTrades)) 
       .Concat(replayQuotationTrades.Where(t => !tradeIds.Contains(t.TradeId))) 
       .Finally(tradeIds.Clear) 
       .Concat(_quotationTrades) 
       .Subscribe(observer);