2012-10-08 9 views
7

Dato un fonte osservabile, generato mediante il polling (variazioni di a) stato di un dispositivo di basso livello ...Rx quadro: eseguire un'azione su timeout senza interrompere la sequenza originale osservabile

// observable source metacode: 
IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5)) 
    .Select(tick => new DeviceState(_device.ReadValue())) 
    .DistinctUntilChanged(); 

.. . e un consumatore che aggiorna l'interfaccia utente ...

// UI metacode: 
service.GetObservableDeviceStates() 
    .Subscribe(state => viewModel.CurrentState = state.ToString()); 

... ho bisogno di eseguire un'azione personalizzata dopo x secondi di "inattività" della fonte, senza interrompere l'abbonamento alla sorgente. Qualcosa del genere:

// UI metacode: 
service.GetObservableDeviceStates() 
    .DoOnTimeout(TimeSpan.FromSeconds(x),() => viewModel.CurrentState = "Idle") 
    .Subscribe(state => viewModel.CurrentState = state.ToString()); 

Quali sono le migliori pratiche? Possibili soluzioni che vengono in mente sono (io sono un noob Rx):

  1. Buffer (anche se non è così leggibile)
  2. Giocando intorno this Timeout overload;
  3. Tornando qualcosa di speciale "service-side", quando nulla cambia (invece di utilizzare DistinctUntilChanged) e trattare con esso sul codice utente:

    service.GetObservableDeviceStates() .Iscriviti (Stato => viewModel.CurrentState = state.Special? "Idle": state.ToString());

EDIT: come riportato in the answer, la soluzione è:

 service.GetObservableDeviceStates() 
      .Do(onNext) 
      .Throttle(TimeSpan.FromSeconds(x)) 
      .Subscribe(onTimeout); 

EDIT2 (Warning)

Se i componenti aggiornamenti dell'interfaccia utente onNext e onTimeout, per evitare CrossThreadExceptions due ObserveOn (uiSynchronizationContext) sono necessari, dal momento che Throttle funziona su un altro thread!

 service.GetObservableDeviceStates() 
      .ObserveOn(uiSynchronizationContext) 
      .Do(onNext) 
      .Throttle(TimeSpan.FromSeconds(x)) 
      .ObserveOn(uiSynchronizationContext) 
      .Subscribe(onTimeout); 
+0

È possibile specificare lo scheduler thread UI come parametro a manetta, al fine di eseguire i timer di limitazione lì, in caso si vuole evitare l'hop ObserveOn aggiuntivo. –

+0

@BartDeSmet Ho già provato Throttle (x, Scheduler.CurrentThread), ma stavo perdendo la reattività dell'interfaccia utente ... Nel mio caso, ObserveOn ha funzionato meglio – Notoriousxl

risposta

7

Timeout è più o meno significato per osservabili che rappresentano operazioni asincrone singoli - per esempio, per restituire un valore predefinito o OnError se detta osservabile non ha notificato in una certa quantità di tempo.

L'operatore che stai cercando è Throttle, anche se all'inizio potrebbe non sembrare così. Throttle(p) fornisce uno stream che produce un valore quando il flusso di origine non ha prodotto un valore per il periodo p.

Parallelamente al codice esistente, è possibile utilizzare source.Throttle(period).Do(...side effect).

+0

Funziona come un incantesimo, grazie! Ho sempre usato Throttle per scartare gli elementi (e per consumare quelli rimanenti sull'abbonamento finale), per non chiamare mai una callback "onPause" :) – Notoriousxl

5

Personalmente eviterei il metodo Do per questo. Rende il codice in questo esempio abbastanza facile, ma trovo che una volta che l'uso di "Do" si intrufola nel codice base, hai presto degli spaghetti.

Si potrebbe anche considerare l'utilizzo di combinazioni di Amb, Timer, TakeUntil, Throttle ecc per ottenere il risultato che si sta cercando e mantenendo il Monad *. O in termini semplici, presumo che tu voglia idealmente avere una sequenza dei valori di stato che passa e non richiede la necessità di inserire un timer nel tuo codice (cioè scaricarlo sul servizio).

public IObservable<DeviceStatus> GetObservableDeviceStates(TimeSpan silencePeriod) 
{ 
    return Observable.Create<DeviceStatus>(
    o=> 
    { 
     var idle = Observable.Timer(silencePeriod).Select(_=>new DeviceStatus("Idle")); 

     var polledStatus = Observable.Interval(TimeSpan.FromSeconds(0.5)) 
         .Select(tick => new DeviceStatus(_device.ReadValue())) 
         .DistinctUntilChanged() 
         .Publish(); 

     var subscription = (from status in polledStatus 
          from cont in Observable.Return(status).Concat(idle.TakeUntil(polledStatus)) 
          select cont) 
        .Subscribe(o); 

     return new CompositeDisposable(subscription, polledStatus.Connect()); 
    }); 
} 

Questo codice ha ora il servizio che restituisce un valore di stand-by Quando si è verificato il periodo di cambiamento di silenzio specificato.

Questo significa che il codice di interfaccia utente meta rimane semplice e la logica relativa a devicestatus rimane al suo posto

// UI metacode: 
service.GetObservableDeviceStates(TimeSpan.FromSeconds(2)) 
    .Subscribe(state => viewModel.CurrentState = state.ToString()); 
+1

+1 perché non si può sottolineare abbastanza che 'Do' è un certo tipo del male Penso che 'Timestamp' |>' CombineLatest' potrebbe essere un po 'più performante in questo caso, però. – Asti

+0

Senti? Lavoriamo in una scienza applicata. Basta fornire una prova. Dichiarare le raccomandazioni di prestazione basate sui sentimenti è un problema nel nostro settore che deve fermarsi. –

+0

Santa negromanzia, Batman! Punto preso Lee. Sarò una persona migliore per questo. :) – Asti

Problemi correlati