2012-10-18 13 views
5

Sto sperimentando con Reactive Extensions per recuperare un gruppo di elementi RSS. Mi sono basato su un post sul blog di Tim Greenfield: Silverlight Rx DataClient within MVVM.Rx Riprova() non funziona come previsto

Lo sto utilizzando in un'applicazione desktop, ma il codice è simile.

Il problema riscontrato è la comprensione delle funzioni Retry(). Non sembra fare ciò che mi aspetto e su ciò che mi aspetto.

var items = new List<RssItem>(); 
WebHelper.DownloadXmlFileAsync<RssItem>(new Uri(URI), "item") 
    .Retry(2) 
    .Finally(PublishResults) 
    .Subscribe(items.Add, ProcessError,() => ProcessCompleted(items)); 

Quando passo in un URI valido, questo funziona senza problemi. Quando faccio un refuso nell'URI, riporta un errore 404 tramite la funzione ProcessError(), come ci si aspetterebbe, ma viene segnalato solo una volta. Mi sarei aspettato che mostrasse questo errore due volte.

Quindi sembra che la funzione Retry() non funzioni sulla mia richiesta Web, ma sembra che si applichi effettivamente alle funzioni che vengono passate a Subscribe(). Potrei sbagliarmi qui però.

Come posso essere sicuro che la chiamata Retry() si applichi alla richiesta web?

Codice extra:

public static class WebHelper 
{ 
    public static HttpWebRequest CreateHttp(Uri uri) 
    { 
     return CreateHttp(uri, "GET"); 
    } 

    public static HttpWebRequest CreateHttp(Uri uri, string method) 
    { 
     if (uri.Scheme != Uri.UriSchemeHttp && uri.Scheme != Uri.UriSchemeHttps) 
     { 
      throw new ArgumentException("The specified URI does not use HTTP or HTTPS.", "uri"); 
     } 

     var request = (HttpWebRequest)WebRequest.Create(uri); 
     request.Method = method; 

     return request; 
    } 

    public static IObservable<T> DownloadXmlFileAsync<T>(Uri uri, string elementName) where T : class 
    { 
     return (from request in Observable.Return(CreateHttp(uri)) 
       from response in Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() 
       let stream = response.GetResponseStream() 
       where stream != null 
       from item in XmlReader.Create(stream).GetXmlItem<T>(elementName).ToObservable() 
       select item); 
    } 
} 

public static class XmlExtensions 
{ 
    public static IEnumerable<T> GetXmlItem<T>(this XmlReader reader, string elementName) where T : class 
    { 
     var serializer = new XmlSerializer(typeof (T)); 
     while (reader.GoToElement(elementName)) 
     { 
      yield return serializer.Deserialize(reader) as T; 
     } 
    } 

    public static bool GoToElement(this XmlReader reader, string elementName) 
    { 
     do 
     { 
      if (reader.NodeType == XmlNodeType.Element && reader.Name == elementName) 
      { 
       return true; 
      } 
     } while (reader.Read()); 

     return false; 
    } 
} 

XmlRoot("item")] 
public class RssItem 
{ 
    [XmlElement("description")] 
    public string Description { get; set; } 

    [XmlElement("link")] 
    public string Link { get; set; } 

    [XmlElement("pubDate")] 
    public string PublishDate { get; set; } 

    [XmlElement("title")] 
    public string Title { get; set; } 

    public override string ToString() 
    { 
     return string.Format("Title: {0}", Title); 
    } 
} 

risposta

11

La grammatica Rx per le sequenze è definito come:

OnNext * (OnError | OnCompleted)?

che assumevano un OnError o un OnCompleted segnali sono attesi alla fine della sequenza e abbonamenti sulla tubazione di essere abbattute.

Nel contesto degli operatori:

observable.Retry(n) è: Re-iscriversi alla observable quando viene ricevuto un OnError, fino a n volte.

observable.Finally(action) è: Eseguire action sulla ricezione OnError|OnCompleted

Retry è destinato ad essere utilizzato con osservabili freddi (Lee Campbell ha a good post on this) dove sottoscrizione determina essenzialmente la sorgente per iniziare.

Analogamente Repeat è esattamente come Retry eccetto che si riattribuisce alla ricezione di OnCompleted.

Per vedere questo in azione, possiamo creare un osservabile che "fallirà" per le prime n volte, e poi avrà successo. Ora per un po 'di codice:

private static IObservable<int> ErrorProducer(int i) 
    { 
     int count = 0; 
     return Observable.Create<int>(observer => 
     { 
      Console.WriteLine("Doing work"); 

      if (count++ < i) 
      { 
       Console.WriteLine("Failed"); 
       observer.OnError(new Exception()); 
      } 
      else 
      { 
       Console.WriteLine("Done"); 
       observer.OnNext(count); 
       observer.OnCompleted();      
      } 
      return Disposable.Empty; 
     }); 
    } 

Per un produttore che non riesce sempre:

 print(ErrorProducer(3).Retry(2)); 

Dà:

Doing work <-- Subscription 
Failed 
Doing work <-- Resubscription 
Failed 
OnError(System.Exception) 
Finally 

Per un produttore che alla fine riesce:

print(ErrorProducer(2).Retry(3)); 

Doing work 
Failed 
Doing work 
Failed 
Doing work 
Done 
OnNext(3) <-- Succeeded 
OnCompleted() 
Finally 

Se volevi y la nostra funzione di errore di processo deve essere richiamata tutte le volte che riprova, dovrebbe essere posizionata prima dello Retry.

cioè seq.Do(value => { }, exception => { }).Retry(n)

Si può leggere su utilizzando osservabili caldo/freddo, e utilizzando il modello asincrono con Rx per chiarire la vostra comprensione.

+2

La tua risposta ha fornito alcune informazioni utili e mi ha anche permesso di cercare su Internet con alcune parole chiave specifiche che hanno dato luogo a http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/96a06e27-9c02-4177 -ae6a-04b8a7f966e5 che mi ha dato un po 'più di intuizione nel modo in cui funzionano gli osservabili. – Jensen

+0

@JensenSomers Contento di essere di aiuto. Mi dispiace non essere molto specifico sulla causa esatta del tuo problema. La documentazione sull'applicazione di Rx è scarsa e spero che queste risposte generali possano essere utili a qualcuno che sta cercando di imparare Rx, in futuro. – Asti

+0

Se tutti quanti saranno entusiasti di Rx come sono, sono sicuro che nel prossimo futuro apparirà una documentazione adeguata e esempi di casi d'uso più ampi. :-) – Jensen

4

La risposta di Asti è azzeccata. Volevo solo aggiungere alcune informazioni aggiuntive nel caso in cui volessi sapere come esporre più errori per una singola sequenza logica.

Come osserva Asti, è possibile terminare una sequenza solo una volta. Questa terminazione può essere un errore o un completamento (OnError | OnCompleted).

Tuttavia non c'è niente che ti impedisce di avere sequenze osservabili nidificate! Se si desidera visualizzare più messaggi di errore, prendere in considerazione uno scenario in cui è stato restituito uno IObservable<IObservable<T>>. La sequenza interna è la sequenza di dati (la sequenza che hai attualmente). Quando questa sequenza Errori non può più essere utilizzata, la sequenza Outer potrebbe produrre una nuova sequenza di dati interna.

Questo può sembrare un po 'strano, ma è un concetto supportato in Rx in quanto operatori come Merge e Switch gestiscono già queste sequenze nidificate. Questo stile di Rx viene toccato nel mio libro, IntroToRx nel paragrafo Nested Sequences e poi di nuovo in maggior dettaglio nel Sequences of Coincidence capitolo

Spero che questo ti aiuti a vedere altre possibilità su come utilizzare Rx in futuro.

Problemi correlati