2014-07-03 11 views
7

Sto cercando di utilizzare Reactive Extensions (Rx) per un'attività in cui sembra essere una buona idea, eseguire il polling a un intervallo specifico di un servizio Web e visualizzarne l'ultimo x risultati.Eseguire il polling di un servizio Web utilizzando le estensioni reattive e vincolare gli ultimi x risultati

Ho un servizio Web che mi invia lo stato di uno strumento che voglio monitorare. Vorrei eseguire il polling di questo strumento a una frequenza specifica e visualizzare in un elenco gli ultimi 20 stati che sono stati interrogati.

Quindi la mia lista sarebbe come una "finestra mobile" del risultato del servizio.

Sto sviluppando un'applicazione WPF con Caliburn.Micro, ma non penso che questo sia molto rilevante.

Cosa sono riuscito ad ottenere fino ad ora è la seguente (solo un app campione che ho inciso in fretta, non ho intenzione di fare questo nel ShellViewModel nel vero app):

public class ShellViewModel : Caliburn.Micro.PropertyChangedBase, IShell 
{ 
    private ObservableCollection<string> times; 
    private string currentTime; 

    public ShellViewModel() 
    { 
     times = new ObservableCollection<string>(); 

     Observable 
      .Interval(TimeSpan.FromSeconds(1)) 
      .SelectMany(x => this.GetCurrentDate().ToObservable()) 
      .ObserveOnDispatcher() 
      .Subscribe(x => 
      { 
       this.CurrentTime = x; 
       this.times.Add(x); 
      }); 
    } 

    public IEnumerable<string> Times 
    { 
     get 
     { 
      return this.times; 
     } 
    } 

    public string CurrentTime 
    { 
     get 
     { 
      return this.currentTime; 
     } 
     set 
     { 
      this.currentTime = value; 
      this.NotifyOfPropertyChange(() => this.CurrentTime); 
     } 
    } 

    private async Task<string> GetCurrentDate() 
    { 
     var client = new RestClient("http://www.timeapi.org"); 
     var request = new RestRequest("/utc/now.json"); 

     var response = await client.ExecuteGetTaskAsync(request); 

     return response.Content; 
    } 
} 

Nel view Ho solo un'etichetta legata alla proprietà e un elenco associato alla proprietà Times.

Il problema che ho è:

  • Non è limitato ai 20 elementi nella lista come ho sempre aggiungere elementi alla ObservableCollection, ma non riesco a trovare un modo migliore per DataBind
  • L'Intervallo non funziona come vorrei. Se l'interrogazione richiede più di 1 secondo per essere eseguita, due query verranno eseguite in parallelo, cosa che non mi piacerebbe accadere. Il mio obiettivo sarebbe che la query si ripeta indefinitamente ma a un ritmo di non più di 1 query ogni secondo. Se una query fa terminare più di 1 secondo, dovrebbe attendere che finisca e attiva direttamente la nuova query.

Seconda modifica:

modifica precedente di seguito è stato me di essere stupido e molto confuso, si innesca gli eventi in modo continuo, perché è qualcosa di intervallo continuo che non finisce mai. La soluzione di Brandon è corretta e funziona come previsto.

Edit:

Sulla base l'esempio di Brandon, ho cercato di fare il seguente codice nel LINQPad:

Observable 
    .Merge(Observable.Interval(TimeSpan.FromSeconds(2)), Observable.Interval(TimeSpan.FromSeconds(10))) 
    .Repeat() 
    .Scan(new List<double>(), (list, item) => { list.Add(item); return list; }) 
    .Subscribe(x => Console.Out.WriteLine(x)) 

E posso vedere che la scrittura per la console avviene ogni 2 secondi, e non ogni 10. Quindi la ripetizione non attende che entrambi gli osservabili finiscano prima di ripetere.

risposta

6

Prova questo:

// timer that completes after 1 second 
var intervalTimer = Observable 
    .Empty<string>() 
    .Delay(TimeSpan.FromSeconds(1)); 

// queries one time whenever subscribed 
var query = Observable.FromAsync(GetCurrentDate); 

// query + interval timer which completes 
// only after both the query and the timer 
// have expired 

var intervalQuery = Observable.Merge(query, intervalTimer); 

// Re-issue the query whenever intervalQuery completes 
var queryLoop = intervalQuery.Repeat(); 

// Keep the 20 most recent results 
// Note. Use an immutable list for this 
// https://www.nuget.org/packages/microsoft.bcl.immutable 
// otherwise you will have problems with 
// the list changing while an observer 
// is still observing it. 
var recentResults = queryLoop.Scan(
    ImmutableList.Create<string>(), // starts off empty 
    (acc, item) => 
    { 
     acc = acc.Add(item); 
     if (acc.Count > 20) 
     { 
      acc = acc.RemoveAt(0); 
     } 

     return acc; 
    }); 

// store the results 
recentResults 
    .ObserveOnDispatcher() 
    .Subscribe(items => 
    { 
     this.CurrentTime = items[0]; 
     this.RecentItems = items; 
    }); 
+0

risposta molto interessante, grazie. Quindi vedo che non esiste un vero altro modo per ottenere l'elenco degli elementi restituiti dalla query piuttosto che scansionare la query e creare un elenco. C'è una ragione per cui non hai usato [TakeLast] (http://msdn.microsoft.com/en-us/library/hh212114 (v = vs.103) .aspx) e ho fatto il controllo nella scansione per i 20 articoli? Inoltre, cosa succede se voglio essere in grado di modificare dinamicamente il ritardo di 1 secondo tra il polling? – Gimly

+0

'TakeLast' prende gli ultimi ** N elementi dallo stream, non gli ultimi ** N elementi. In altre parole, non produrrà nulla finché lo stream * non termina *, quindi produrrà gli ultimi N elementi prodotti. Non molto utile qui. Se si avvolge la definizione di 'intervalTimer' con' Observable.Defer' e si fa leggere una variabile o si richiama una funzione per l'intervallo di tempo, quindi 'Defer' ricostruirà il timer a ogni iterazione, dando la possibilità di modificare il ritardo periodo. – Brandon

+0

Ah, ecco perché i miei test con Take e TakeLast non hanno avuto successo. Grazie ancora per il vostro aiuto. – Gimly

0

Questo dovrebbe saltare i messaggi di intervallo mentre è in corso un GetCurrentDate.

Observable 
    .Interval(TimeSpan.FromSeconds(1)) 
    .GroupByUntil(p => 1,p => GetCurrentDate().ToObservable().Do(x => { 
          this.CurrentTime = x; 
          this.times.Add(x); 
         })) 
    .SelectMany(p => p.LastAsync()) 
    .Subscribe(); 
Problemi correlati