2013-03-06 7 views
7

Ho creato un operatore SlidingWindow() per estensioni reattive perché voglio monitorare facilmente cose come medie mobili, ecc. Come un semplice esempio, voglio iscriversi per ascoltare gli eventi del mouse, ma ogni volta c'è un evento che voglio ricevere gli ultimi tre (invece di aspettare ogni terzo evento per ricevere gli ultimi tre). Ecco perché gli overload di Window che ho trovato non sembrano darmi quello che mi serve fuori dalla scatola.Problemi nell'implementazione di una finestra scorrevole in Rx

Questo è quello che mi è venuto in mente. Temo che potrebbe non essere la soluzione più performante, date le sue frequenti operazioni List:

public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length) 
{ 
    var seed = new List<T>(); 

    Func<List<T>, T, List<T>> accumulator = (list, arg2) => 
    { 
     list.Add(arg2); 

     if (list.Count > length) 
      list.RemoveRange(0, (list.Count - length)); 

     return list; 
    }; 

    return seq.Scan(seed, accumulator) 
       .Where(list => list.Count == length); 
} 

Può essere chiamato in questo modo:

var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable(); 

Tuttavia, con mia grande sorpresa, invece di ricevere il risultati attesi

1,2,3 
2,3,4 
3,4,5 

ricevo i risultati

2,3,4 
3,4,5 
3,4,5 

Qualsiasi approfondimento sarebbe molto apprezzato!

risposta

5

Prova a modificare - avrei dovuto sedersi e avere un pensare che è la performance relativa, ma è almeno probabile come buono, e il modo più facile da leggere: rig

public static IObservable<IList<T>> SlidingWindow<T>(
     this IObservable<T> src, 
     int windowSize) 
{ 
    var feed = src.Publish().RefCount();  
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list 
    return Observable.Zip(
     Enumerable.Range(0, windowSize) 
      .Select(skip => feed.Skip(skip)) 
      .ToArray()); 
} 

test :

var source = Observable.Range(0, 10); 
var query = source.SlidingWindow(3); 
using(query.Subscribe(Console.WriteLine)) 
{    
    Console.ReadLine(); 
} 

uscita:

ListOf(0,1,2) 
ListOf(1,2,3) 
ListOf(2,3,4) 
ListOf(3,4,5) 
ListOf(4,5,6) 
... 

EDIT: Come parte, mi ritrovo compulsivamente a .Publish().RefCount() da quando sono stato bruciato una volta non facendolo ... Non penso sia strettamente necessario qui, comunque.

EDIT per yzorg:

Se aumentare il metodo in questo modo, si vedrà il comportamento runtime più chiaramente:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> src, 
    int windowSize) 
{ 
    var feed = src.Publish().RefCount();  
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list 
    return Observable.Zip(
    Enumerable.Range(0, windowSize) 
     .Select(skip => 
     { 
      Console.WriteLine("Skipping {0} els", skip); 
      return feed.Skip(skip); 
     }) 
     .ToArray()); 
} 
+0

@blaster Nessun problema: in effetti, grazie per "avermi fatto", lo ho scritto, poiché l'ho usato io stesso un paio di volte da quando ho risposto a questo. ;) – JerKimball

+0

Non penso che questo sia buono. I file .Publish(), .Range (0, x) e .Skip() - quando questi sono combinati, sembrano prestazioni scadenti, in particolare O n^2, perché Skip sta andando a ripetere l'intero flusso ripetutamente.Ad esempio, è necessario iterare 30.000 interi per ottenere (10000, 10001, 10002). Quindi in realtà non stai mantenendo un buffer scorrevole del flusso sorgente in memoria, dovresti mantenere l'intero stream sorgente (dall'inizio del tempo) in memoria, che è quello che pensavo stessimo evitando. – yzorg

+0

@yzorg controlla la modifica – JerKimball

9

Utilizzando il test originale, con un argomento di 3 per il conteggio, questo dà i risultati desiderati:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> source, int count) 
{ 
    return source.Buffer(count, 1) 
       .Where(list => list.Count == count); 
} 

test come questo:

var source = Observable.Range(1, 5); 
var query = source.SlidingWindow(3); 
using (query.Subscribe(i => Console.WriteLine(string.Join(",", i)))) 
{ 

} 

uscita:

1,2,3 
2,3,4 
3,4,5 
6

Proprio source.Window(count, 1) - o source.Buffer(count, 1) che si tratti di una finestra/del buffer di oggetti "contare", scorrevole per uno.

Problemi correlati