Ricordare che i callback come quello utilizzato in EnumWindows
sono leggermente diversi da Rx. Nello specifico, il callback può comunicare al chiamante attraverso il suo valore di ritorno. Gli osservatori Rx non possono farlo. Inoltre, i callback possono ricevere più parametri, ma gli osservatori Rx ricevono un singolo valore. Quindi è necessario avvolgere i parametri multipli in un singolo oggetto.
Con questo in mente, un'alternativa all'uso di Subject
è quella di utilizzare Observable.Create
. In questo modo si registra solo la richiamata quando c'è effettivamente un osservatore e si annulla la registrazione se tale osservatore annulla la sottoscrizione.
Per l'API sincrono hai utilizzato un esempio, potresti fare qualcosa di simile. Nota in questo esempio non esiste in realtà un modo per annullare la registrazione del mid-stream di callback poiché tutto avviene in modo sincrono prima di poter mai restituire l'eliminabile di annullamento dell'iscrizione.
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.Create<Foo>(observer =>
{
FooApi.enumerate(arg1, arg2, e =>
{
observer.OnNext(new Foo(e));
return true;
});
// In your case, FooApi.enumerate is actually synchronous
// so when we get to this line of code, we know
// the stream is complete.
observer.OnCompleted();
return Disposable.Empty;
});
}
// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item
possiamo risolvere il problema con l'essere in grado di fermare presto con l'introduzione di un po 'di asincronia, che darà il tempo dell'osservatore per ottenere un usa e getta che si può disporre per informare l'utente. Possiamo usare CreateAsync
per ottenere un CancellationToken
che annullerà quando l'osservatore annulla la sottoscrizione. E siamo in grado di eseguire il codice FooApi
all'interno Task.Run
:
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.CreateAsync<Foo>(async (observer, ct) =>
{
await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
{
observer.OnNext(e);
// Returning false will stop the enumeration
return !ct.IsCancellationRequested;
}));
observer.OnCompleted();
});
}
In un più tradizionale API callback asincroni, dove si registra ad un certo punto e annullare la registrazione in un altro punto, si potrebbe avere qualcosa di più simile a questo:
public static IObservable<Foo> WrapFooApi(string args)
{
return Observable.Create<Foo>(observer =>
{
FooToken token = default(FooToken);
var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
token = FooApi.Register(args, e =>
{
observer.OnNext(new Foo(e));
});
return unsubscribe;
});
}
Oppure puoi semplicemente usare un 'Subject' e chiamare' OnNext' su di esso nella richiamata. – Dirk
@Dirk, interessante, grazie. Quindi, 'Subject.OnNext' quindi' Subject.OnComplete' quando non ci sono più elementi? – avo
Sì, il soggetto implementa sia 'IObservable' che' IObserver'. Chiama OnNext/OnError/OnCompleted per inviare tali comandi agli utenti iscritti all'oggetto.Funzionano come una sorta di gateway da codice non-Rx a Rx. – Dirk