2012-05-03 11 views
5

Ho una vasta collezione di classe semplice coppia:usa RX per attivare eventi in momenti diversi?

public class Pair { public DateTime Timestamp; public double Value; } 

Essi sono ordinati per il Timestamp ascendente. Voglio attivare un evento con il valore (ad esempio, azione <doppio>) per ciascun elemento nell'elenco al momento opportuno. I tempi sono passati, quindi ho bisogno di normalizzare i timestamp in modo tale che il primo della lista sia "now". Possiamo impostarlo con le Reactive Extensions in modo tale da attivare l'evento successivo dopo la differenza di tempo tra due elementi?

+0

Hai dato un'occhiata a http://reactiveproperty.codeplex.com/? – dwerner

risposta

6

Say pairs è la sequenza:

var obs = pairs.OrderBy(p => p.Timestamp).ToObservable(); 

Ora obs è coppie come un'osservabile ordinato.

Observable.Zip(
    obs, 
    obs.Take(1).Concat(obs), 
    (pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp) 
     .Select(_ => pair1.Value)) 
.Concat() 
.Subscribe(/* Do something here */); 

Lo zip si occupa di trasformare i tempi assoluti in offset. Che prenderà la sequenza e unirsi con se stesso, ma compensato da uno, come segue

Original 1--2--4--7--11 
Offset 1--1--2--4--7--11 
Joined 0--1--2--3--4 

Questo nuovo valore viene poi messo in Observable.Timer ritardarla la quantità appropriata. L'ultimo Concat appiattisce il risultato di un IObservable<IObservable<double>> in un IObservable<double>. Questo presuppone che la sequenza sia ordinata.

+0

Bella soluzione. Aggiungerei 'var orderedObs = pairs.OrderBy (p => p.Timestamp) .ToObservable()' per rendere evidente ciò che deve accadere e usarlo. Ho apportato queste modifiche .. – yamen

+0

Questo aiuta molto. L'ho usato per interrogare i dati storici e riprodurli come se fossero originariamente registrati. Un simulatore per dimostrare che il nuovo sistema funziona. –

+0

Mi ci è voluto un po 'per capire cosa stava succedendo esattamente, ma ora capisco. Rx è un mindf ***. Ottima soluzione però. +1 – BFree

0

Penso che questo problema sia interessante, questo sarebbe il mio primo tentativo.

static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent) 
{ 
    if (pairs == null || !pairs.Any() || pairEvent == null) 
    return; 

    // if we can promise the pairs are already sorted 
    // obviously we don't need this next line 
    pairs = pairs.OrderBy(p => p.Timestamp); 
    var first = pairs .First().Timestamp; 
    var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p }); 

    var start = DateTime.Now; 

    double interval = 250; // 1/4 second 
    Timer timer = new Timer(interval); 

    timer.AutoReset = true; 
    timer.Elapsed += (sender, elapsedArgs) => 
    { 
    var signalTime = elapsedArgs.SignalTime; 
    var elapsedTime = (signalTime - start); 

    var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair); 
    wrapped = wrapped.Skip(pairsToTrigger.Count()); 

    if (!wrapped.Any()) 
     timer.Stop(); 

    foreach (var pair in pairsToTrigger) 
     pairEvent(pair.Value);  
    }; 

    timer.Start(); 
} 
+0

Questo è davvero inutilmente complesso dato che Rx ha estensioni come 'Timer',' Defer' e 'Delay'. – yamen

+0

@yamen Non ho mai usato Rx, non ho nemmeno intenzione di farlo. Volevo rispondere a come farlo da zero come una sfida perché pensavo che fosse intersecante :) scusa se la mia risposta in questo contesto è semplicemente spam. – payo

+2

Non c'è bisogno di scusarsi, spero che impari qualcosa dalle soluzioni Rx sopra. La tua risposta in realtà serve da esempio sul perché Rx è fantastico :-) – yamen

2

Se per "usando Rx" mi permette di usare solo i pianificatori Rx, allora questa è una soluzione molto semplice:

Action<double> action = 
    x => 
     Console.WriteLine(x); 

var ts0 = pairs.Select(p => p.Timestamp).Min(); 

pairs 
    .ForEach(p => 
     Scheduler 
      .ThreadPool 
      .Schedule(
       p.Timestamp.Subtract(ts0), 
       () => action(p.Value))); 

Questo utilizza la System.Interactive estensione ForEach, ma si potrebbe utilizzare un ciclo regolare foreach per caricare lo scheduler.

Ho testato il codice con i seguenti dati dummy:

var pairs = new [] 
{ 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, }, 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, }, 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, }, 
}; 

Spero che questo aiuta.

+0

Lo scheduler ha una propria coda? O questo codice potrebbe rompere l'intero threadpool? Sono solo preoccupato per la scalabilità di questa soluzione. – Brannon

+0

@Brannon - Se ricordo correttamente gli scheduler usano internamente un ordinamento heap per mettere in coda le azioni.Inoltre, uno schedulatore eseguirà solo un'azione alla volta e riutilizzerà il thread corrente se un'altra azione è immediatamente pronta per essere eseguita. Quindi usano sempre solo un thread alla volta. – Enigmativity

Problemi correlati