2013-02-11 9 views
5

Come si può fare in RX una semplice trasformazione di stato di una sequenza?RX: trasformazione di stato della sequenza, ad es. media mobile esponenziale

Supponiamo di voler realizzare una trasformata esponenziale a media mobile di un IO disturbabile.

Ogni volta noisySequence zecche, emaSequence deve spuntare e restituire il valore (previousEmaSequenceValue * (1-lambda) + latestNoisySequenceValue * lambda)

Credo che usiamo soggetti, ma come esattamente?

public static void Main() 
    { 

     var rand = new Random(); 

     IObservable<double> sequence = Observable 
      .Interval(TimeSpan.FromMilliseconds(1000)) 
      .Select(value => value + rand.NextDouble()); 

     Func<double, double> addNoise = x => x + 10*(rand.NextDouble() - 0.5); 

     IObservable<double> noisySequence = sequence.Select(addNoise); 

     Subject<double> exponentialMovingAverage = new Subject<double>(); // ??? 


     sequence.Subscribe(value => Console.WriteLine("original sequence "+value)); 
     noisySequence.Subscribe(value => Console.WriteLine("noisy sequence " + value)); 
     exponentialMovingAverage.Subscribe(value => Console.WriteLine("ema sequence " + value)); 

     Console.ReadLine(); 
    } 
+1

Per chiarire, sono meno interessato a un metodo specifico per fare un modo medio, ma piuttosto generico per realizzare trasformazioni di stato. – Sputnik2513

risposta

3

Per molti di questi tipi di calcoli, Buffer è il modo più semplice

var movingAverage = noisySequence.Buffer(/*last*/ 3, /*move forward/* 1 /*at a time*/) 
    .Select(x => (x[0] + x[1] + x[2])/3.0); 

Se avete bisogno di portare lo stato in giro, utilizzare l'operatore Scan, che è come Aggregate tranne che produce valori ogni iterazione.

+0

Che funziona in questo specifico esempio di una media. Più in generale, se abbiamo bisogno di stati interni, come meglio farlo in RX? – Sputnik2513

+0

Anche questo non ha stati. Per implementare una media mobile esponenziale, è necessario conoscere l'ultimo valore dell'EMA. – Sputnik2513

+0

Come ha detto Paul, è possibile utilizzare Scansione che consente di aggregare al volo iniziando con uno "stato" iniziale e eseguendo un'azione su quello stato per ciascun OnNext. È quindi possibile selezionare dallo stato il valore che si desidera pubblicare fuori dalla sequenza. Se tuttavia è necessario accedere a qualcosa di più del solo aggregato corrente e del valore più recente, è probabile che gli operatori di Window possano essere utilizzati. –

7

Ecco come è possibile collegare lo stato a una sequenza. In questo caso calcola la media degli ultimi 10 valori.

var movingAvg = noisySequence.Scan(new List<double>(), 
(buffer, value)=> 
{ 
    buffer.Add(value); 
    if(buffer.Count>MaxSize) 
    { 
     buffer.RemoveAt(0); 
    } 
    return buffer; 
}).Select(buffer=>buffer.Average()); 

Ma si potrebbe utilizzare Window (che Buffer è una sorta di generalizzazione di) per ottenere il vostro media troppo.

noisySequence.Window(10) 
    .Select(window=>window.Average()) 
    .SelectMany(averageSequence=>averageSequence); 
+0

Tangenziale: i tuoi articoli su 'Window',' Buffer', 'Join' e' GroupJoin' erano estremamente illuminanti - in effetti, credo di aver "citato" un certo numero di volte rispondendo alle domande relative a Rx. :) – JerKimball

1

Grazie! Ecco una soluzione usando Scan

const double lambda = 0.99; 
    IObservable<double> emaSequence = noisySequence.Scan(Double.NaN, (emaValue, value) => 
     { 
      if (Double.IsNaN(emaValue)) 
      { 
       emaValue = value; 
      } 
      else 
      { 
       emaValue = emaValue*lambda + value*(1-lambda); 
      } 
      return emaValue; 
     }).Select(emaValue => emaValue); 
Problemi correlati