8

Sono un po 'nuovo per Rx.NET. È possibile rilevare un'eccezione che può essere lanciata da uno qualsiasi degli abbonati? Prendi il seguente ...Eccezioni di cattura che possono essere generate da un abbonamento OnNext Azione

handler.FooStream.Subscribe(
      _ => throw new Exception("Bar"), 
      _ => { }); 

Attualmente sto prendendo in considerazione una base per abbonamento con un'istanza di quanto segue. L'implementazione della quale utilizza solo un ManualResetEvent per riattivare un thread in attesa.

public interface IExceptionCatcher 
{ 
    Action<T> Exec<T>(Action<T> action); 
} 

e usarlo in questo modo ...

handler.FooStream.Subscribe(
      _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred 
      _ => { }); 

Mi sento come se ci deve essere un modo migliore. Tutte le funzionalità di gestione degli errori in Rx.NET sono specifiche per la gestione degli errori osservabili?

MODIFICA: Per richiesta, la mia implementazione è https://gist.github.com/1409829 (l'interfaccia e l'implementazione sono separate in diversi assembly nel codice prod). Il feedback è benvenuto Può sembrare sciocco, ma sto usando il windsor del castello per gestire molti abbonati Rx diversi. Questo collettore eccezione viene registrata con il contenitore come questo

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher)); 

Sarebbe quindi essere utilizzato come questo dove observable è istanza di IObservable:

var exceptionCatcher = 
    new ExceptionCatcher(e => 
           { 
            Logger.FatalException(
             "Exception caught, shutting down.", e); 
            // Deal with unmanaged resources here 
           }, false); 


/* 
* Normally the code below exists in some class managed by an IoC container. 
* 'catcher' would be provided by the container. 
*/ 
observable /* do some filtering, selecting, grouping etc */ 
    .SubscribeWithExceptionCatching(processItems, catcher); 

risposta

8

Il built-in operatori osservabili non fai quello che sei chiedendo per impostazione predefinita (molto simile a eventi), ma si potrebbe fare un metodo di estensione che farebbe questo.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
           this IObservable<T> source 
           ) where TException : Exception 
{ 
    return Observable.CreateWithDisposable<T>(
     o => source.Subscribe(
      v => { try { o.OnNext(v); } 
        catch (TException) { } 
      }, 
      ex => o.OnError(ex), 
      () => o.OnCompleted() 
      )); 
} 

Quindi qualsiasi osservabile potrebbe essere avvolto da questo metodo per ottenere il comportamento che hai descritto.

+0

Grazie, hai risposto alla mia domanda, ma sei sicuro che il tuo tentativo di aggirare OnNext catturerà eccezioni? Si potrebbe facilmente fare qualcosa con IObservable restituito che causerebbe l'esecuzione del codice sottoscritto su un altro thread. Inizialmente ho provato a provare/catturare la mia chiamata Subject.OnNext ma non sono state rilevate eccezioni. Potrei comunque creare un metodo SubscribeWithExceptionHandling o qualcosa del genere. – drstevens

+1

@drstevens Cattura eccezioni dalla stessa discussione. Se il tuo osservatore sta lanciando proprie operazioni asincrone che generano eccezioni, questo non le catturerà. –

+1

Considerando bene quante delle operazioni Rx generano un nuovo thread (o attività nel pool), direi che è piuttosto probabile che sia così. Tra il 'handler.FooStream' e' Subscribe' in questione c'è un 'GroupByUntil (...). SelectMany (...). Buffer (con un tempo)'. Alla fine ho creato un 'SubscribeWithCatch' seguendo il tuo esempio che cattura Exception e poi usa la stessa azione passata al gestore OnError. – drstevens

Problemi correlati