2013-10-05 11 views
5

Sto iniziando lo sviluppo con estensioni Reactive (versione 2.1, per ogni evenienza) e per la mia applicazione di esempio ho bisogno di una sequenza di valori int spinti con un certo intervallo , cioè ogni 1 secondo.Come incollare il ritardo relativo nella sequenza osservabile con Rx (Estensioni reattive)

So che posso creare una sequenza con Observable.Range<int>(0,10) ma non riesco a capire come impostare il tempo relativo tra le spinte. Ho provato Delay() ma sposta la sequenza solo una volta all'inizio.

Ho poi trovato Observable.Generate() metodo che potrebbe essere adattato a questo compito in modo seguente:

var delayed = Observable. 
       Generate(0, i => i <= 10, i => i + 1, i => i, 
          i => TimeSpan.FromSeconds(1)); 

Ma che sembra funzionare solo per semplice 'for-each-like' sequenze definite. Quindi, in generale, la mia domanda è, se siamo in grado di ottenere qualsiasi sequenza sorgente e avvolgerla con qualche proxy che estrae i messaggi dall'origine e lo spinga ulteriormente con un ritardo temporale?

S--d1--d2--d3--d4--d5-| 
D--d1-delay-d2-delay-d3-delay-d4-delay-d5-| 

P.S. Se questo approccio è in contraddizione con il concetto di ReactiveExtensions, si prega di notare anche questo. Non voglio farlo "con tutti i mezzi" e in futuro otterranno altri problemi di progettazione.

P.P.S idea generale è di assicurarsi che sequenza uscita ha un intervallo specificato tra eventi nonostante se la sequenza di ingresso è finita o infinita e come spesso si spinge eventi.

risposta

7

Observable.Interval è ciò che si desidera guardare. Si genererà un valore a lungo 0 sulla base di incrementare di 1 ogni intervallo di tempo che si specifica ad es .:

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine(x)); 

È quindi possibile utilizzare una proiezione (Select) per compensare/modificare questo valore in base alle esigenze.

È anche possibile "stimolare" un flusso con un altro utilizzando l'operatore Zip: è possibile che si desideri esaminare anche questo. Zip raggruppa eventi di due flussi insieme, quindi emette al ritmo del flusso attualmente più lento. Zip è anche abbastanza flessibile, può comprimere qualsiasi numero di stream, o persino comprimere un IO in un IEnumerable. Ecco un esempio di questo:

var pets = new List<string> { "Dog", "Cat", "Elephant" }; 
var pace = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Zip(pets, (n, p) => p)  
    .Subscribe(x => Console.WriteLine(x),() => Console.WriteLine("Done")); 

Questo scrive gli animali domestici a un intervallo di 1 secondo.

Alla luce del P.P.S. aggiunto sopra, darò un'altra risposta - Lascio questo per riferimento in quanto è comunque una tecnica utile.

+1

Grazie per la risposta. L'hack con Zip() funziona quasi bene. Una pecca qui, è se la sequenza di 'input' è infinita e imprevedibile di tempo (cioè l'evento di clic del pulsante). Con esso, se il pulsante non è stato cliccato per - diciamo 10 secondi - non ci sono stati eventi in "input", ma la sequenza "timer" ha prodotto 10 eventi. Quindi se inizio a fare clic sul pulsante velocemente, tutti i nuovi eventi da "input" vengono compressi a coppie con gli eventi esistenti da "timer", quindi anche la sequenza "result" verrà tirata con la velocità di "input", che non è quella che volere. – Antonio

+0

Aggiungerei check come: _in nuovo evento 't_i' in' timer', controlla se c'è un evento in 'input' e rilascia' t_i' se non ci sono. – Antonio

+0

Il bit su come fare clic su un pulsante è tutto notizie per me :) Non era nella tua domanda. Forse puoi riformulare il problema esclusivamente in termini di dati reali coinvolti e esperienza utente desiderata? –

2

Quindi, per chiarire, si desidera che l'uscita spinga l'input ad una frequenza non più veloce dell'intervallo, ma altrimenti il ​​più velocemente possibile.

In tal caso, prova questo. La costruzione variabile input è un modo hacky per creare una sequenza sporadica breve che a volte è più veloce e talvolta più lenta di un ritmo di 2 secondi. Notare che l'uscita dal cronometro mostrerà le piccole imprecisioni nel meccanismo del timer utilizzato da Rx.

var input = Observable.Interval(TimeSpan.FromSeconds(1)).Take(4); 
input = input.Concat(Observable.Interval(TimeSpan.FromSeconds(5)).Take(2)); 

var interval = TimeSpan.FromSeconds(2); 

var paced = input.Select(i => Observable.Empty<long>() 
             .Delay(interval) 
             .StartWith(i)).Concat(); 

var stopwatch = new Stopwatch(); 
stopwatch.Start(); 
paced.Subscribe(
    x => Console.WriteLine(x + " " + stopwatch.ElapsedMilliseconds), 
    () => Console.WriteLine("Done")); 

Questo esempio funziona proiettando ogni tick dall'ingresso in una sequenza che ha la zecca come un singolo evento in partenza, ma non onComplete per l'intervallo desiderato. Il flusso di flussi risultante viene quindi concatenato. Questo approccio garantisce che i nuovi tick vengano emessi immediatamente se l'output è attualmente "svuotato", ma bufferato uno dietro l'altro altrimenti.

È possibile eseguire il wrapping in un metodo di estensione per renderlo generico.

0

Ecco l'approccio più semplice per fare ciò che si vuole:

var delayed = 
    source.Do(x => Thread.Sleep(1000)); 

Si aggiunge un secondo di ritardo, ma lo fa lo fa prima del primo elemento. Potresti certamente concludere una logica per non ritardare all'inizio. Non sarebbe troppo difficile.


Ecco un'alternativa che pianifica il ritardo su una tendenza completamente nuova.

var delayed = 
    Observable.Create<int>(o => 
    { 
     var els = new EventLoopScheduler(); 
     return source 
      .ObserveOn(els) 
      .Do(x => els.Schedule(() => Thread.Sleep(1000))) 
      .Subscribe(o); 
    }); 
+0

Devo dire che non sono un fan di questo approccio per alcuni motivi. Non esegue il test unitario - diversamente dal ritardo non è possibile parametrizzare uno scheduler in attesa. Blocca anche un filo inutilmente.Utilizza anche Do, che viene eseguito per abbonato, in modo da offrire l'opportunità di effetti collaterali inattesi (immagina un operatore downstream che effettua un doppio abbonamento). Infine, come risultato di quanto sopra, non è un idioma Rx –

+0

@JamesWorld - Sì, sono d'accordo che la mia prima opzione non è la migliore. Il secondo è in qualche modo migliore in quanto non blocca alcun thread esistente. – Enigmativity

0

Forse quello che stai cercando è il metodo di estensione Buffer. È firma viene definito così:

public static IObservable<IList<TSource>> Buffer<TSource>(
    this IObservable<TSource> source, 
    TimeSpan timeSpan) 

trasformerà sequenza sorgente in modo che i valori sono prodotte in serie con la frequenza timeSpan.

0

So che questa è una vecchia domanda ma penso di avere la risposta corretta.

Zippare un Observable.Timer produce "tick" anche se la sorgente Observable non produce nulla. Ciò significa che, una volta che la fonte produce un altro oggetto, verranno utilizzate le zecche già prodotte e non ancora consumate. Risultante in un osservabile che aggiungerà un ritardo tra gli articoli quando il produttore produce articoli ad un ritmo costante, ma che produrrà esplosioni di articoli se il produttore a volte impiega più tempo a produrre un oggetto.

Per aggirare questo è necessario generare un timer, che produce un solo oggetto, tra ogni oggetto prodotto dal vostro osservabile. È possibile farlo con Observable.Switch come questo:

var subject = new Subject<Unit>(); 

     var producer = subject.SelectMany(
            _ => 
            { 
             return new[] 
             { 
              Observable.Return(true), 
              Observable.Timer(TimeSpan.FromSeconds(2)) 
                .Select(q => false) 
             }; 
            }) 
           .Switch();    
Problemi correlati