2014-07-10 9 views
10

Sto cercando un modo elegante per creare un Observable da un semplice delegato di richiamata con Rx, qualcosa di simile a Observable.FromEventPattern?Osservabile per una richiamata in Rx

Dire, sto avvolgendo API Win32 EnumWindows che richiama il EnumWindowsProc che fornisco.

So che potrei creare un adattatore di eventi C# temporaneo per questo callback e passarlo a FromEventPattern. Inoltre, potrei probabilmente implementare manualmente IObservable, quindi chiamerebbe IObserver.OnNext dal mio callback EnumWindowsProc.

C'è un modello esistente per il wrapping di un callback in Rx che mi manca?

+1

Oppure puoi semplicemente usare un 'Subject' e chiamare' OnNext' su di esso nella richiamata. – Dirk

+0

@Dirk, interessante, grazie. Quindi, 'Subject.OnNext' quindi' Subject.OnComplete' quando non ci sono più elementi? – avo

+1

Sì, il soggetto implementa sia 'IObservable' che' IObserver'. Chiama OnNext/OnError/OnCompleted per inviare tali comandi agli utenti iscritti all'oggetto.Funzionano come una sorta di gateway da codice non-Rx a Rx. – Dirk

risposta

8

È possibile utilizzare uno Subject<T> che può essere utilizzato per passare dal mondo di programmazione imperativo al mondo funzionale di Rx.

Subject<T> implementa sia IObservable<T> e IObserver<T>, in modo da poter chiamare i suoi OnNext, OnError e OnCompleted metodi e gli iscritti riceveranno una notifica.

Se si desidera esporre la Subject<T> come una proprietà, allora si dovrebbe fare in modo da utilizzare .AsObservable() come questo nasconde il fatto che il IObservable<T> è in realtà un Subject<T>. Rende impossibile cose come ((Subject<string>) obj.Event).OnNext("Foo").

+0

Questa probabilmente è la strada da percorrere. Mi piacerebbe solo vedere cosa potrebbero dire gli altri, prima di accettarlo. – avo

+1

@avo È certamente una soluzione semplice, ma alcune persone non amano l'uso di Subject in quanto questa è una delle poche cose non funzionali in Rx. Penso che abbia alcuni buoni usi, ma se c'è un altro modo - come FromEventPattern - allora lo userei invece. – Dirk

3

Ricordare che i callback come quello utilizzato in EnumWindows sono leggermente diversi da Rx. Nello specifico, il callback può comunicare al chiamante attraverso il suo valore di ritorno. Gli osservatori Rx non possono farlo. Inoltre, i callback possono ricevere più parametri, ma gli osservatori Rx ricevono un singolo valore. Quindi è necessario avvolgere i parametri multipli in un singolo oggetto.

Con questo in mente, un'alternativa all'uso di Subject è quella di utilizzare Observable.Create. In questo modo si registra solo la richiamata quando c'è effettivamente un osservatore e si annulla la registrazione se tale osservatore annulla la sottoscrizione.

Per l'API sincrono hai utilizzato un esempio, potresti fare qualcosa di simile. Nota in questo esempio non esiste in realtà un modo per annullare la registrazione del mid-stream di callback poiché tutto avviene in modo sincrono prima di poter mai restituire l'eliminabile di annullamento dell'iscrizione.

public static IObservable<Foo> WrapFooApi(string arg1, string arg2) 
{ 
    return Observable.Create<Foo>(observer => 
    { 
     FooApi.enumerate(arg1, arg2, e => 
     { 
      observer.OnNext(new Foo(e)); 
      return true; 
     }); 

     // In your case, FooApi.enumerate is actually synchronous 
     // so when we get to this line of code, we know 
     // the stream is complete. 
     observer.OnCompleted(); 
     return Disposable.Empty; 
    }); 
} 

// Usage 
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item 

possiamo risolvere il problema con l'essere in grado di fermare presto con l'introduzione di un po 'di asincronia, che darà il tempo dell'osservatore per ottenere un usa e getta che si può disporre per informare l'utente. Possiamo usare CreateAsync per ottenere un CancellationToken che annullerà quando l'osservatore annulla la sottoscrizione. E siamo in grado di eseguire il codice FooApi all'interno Task.Run:

public static IObservable<Foo> WrapFooApi(string arg1, string arg2) 
{ 
    return Observable.CreateAsync<Foo>(async (observer, ct) => 
    { 
     await Task.Run(() => FooApi.register_callback(arg1, arg2, e => 
     { 
      observer.OnNext(e); 

      // Returning false will stop the enumeration 
      return !ct.IsCancellationRequested; 
     })); 
     observer.OnCompleted(); 
    }); 
} 

In un più tradizionale API callback asincroni, dove si registra ad un certo punto e annullare la registrazione in un altro punto, si potrebbe avere qualcosa di più simile a questo:

public static IObservable<Foo> WrapFooApi(string args) 
{ 
    return Observable.Create<Foo>(observer => 
    { 
     FooToken token = default(FooToken); 
     var unsubscribe = Disposable.Create(() => FooApi.Unregister(token)); 
     token = FooApi.Register(args, e => 
     { 
      observer.OnNext(new Foo(e)); 
     }); 

     return unsubscribe; 
    }); 
} 
+1

Non credo che in questo caso particolare sia possibile utilizzare API asincrone per fare ciò: il callback EnumWindows viene richiamato in modo sincrono dal chiamante (ad esempio, l'API EnumWindows utilizza solo un ciclo for e chiama il callback per ciascun elemento) –

Problemi correlati