2015-03-25 13 views
5

Ho una classe che sarà responsabile della generazione di eventi in un intervallo frequente ma irregolare, che le altre classi devono consumare e operare. Voglio usare Reactive Extensions per questa attività.Come si spinge un'entità su un Rx Observable?

Il lato consumer di questo è molto semplice; Ho la mia classe di consumatori che implementa lo IObserver<Payload> e tutto sembra a posto. Il problema arriva nella classe dei produttori.

Implementazione IObservable<Payload> direttamente (cioè, mettendo il mio propria implementazione per IDisposable Subscribe(IObserver<Payload>) è, secondo la documentazione, non è raccomandato. Suggerisce invece comporre con la Observable.Create() insieme di funzioni. Dal momento che la mia classe verrà eseguito per molto tempo, I' ve provato a generare un 'osservabile con var myObservable = Observable.Never(), e poi, quando mi arrivano nuovi Payloads disponibili, chiamando myObservable.Publish(payloadData). quando faccio questo, però, non mi sembra di colpire l'attuazione OnNext nel mio consumo.

credo, come un work-around, posso creare un evento nella mia classe e quindi creare l'Observable usando la funzione FromEvent, ma questo sembra un co troppo approccio complicato (cioè, sembra strano che la nuova hotness di Observables "richieda" che gli eventi funzionino). C'è un approccio semplice che sto trascurando qui? Qual è il modo "standard" per creare le tue fonti osservabili?

+1

Come regola generale, se ti trovi a implementare "IObservable " o "IObserver ", probabilmente stai facendo qualcosa di sbagliato. – Enigmativity

risposta

6

Creare un new Subject<Payload> e chiamare è OnNext metodo per inviare un evento. È possibile Subscribe per l'oggetto con l'osservatore.

L'uso di soggetti è spesso dibattuto. Per una discussione approfondita sull'uso dei soggetti (che rimanda a un primer), vedere here - ma in sintesi, questo caso d'uso sembra valido (vale a dire che probabilmente soddisfa i criteri locali + hot).

Come accantonamento, i sovraccarichi di sottoscrittori che accettano delegati possono rimuovere la necessità di fornire un'implementazione di IObserver<T>.

+0

Il concetto di Oggetto e i collegamenti forniti mi hanno aiutato a capire questo in modo più completo. – GWLlosa

1

Observable.Never() non "invia" le notifiche, è necessario utilizzare Observable.Return(yourValue)

Se avete bisogno di una guida con esempi concreti vi consiglio di leggere Intro to Rx

+0

Posso essere molto confuso qui ... Se utilizzo Observable.Return, non sarà visibile solo l'unico valore osservabile? Ne ho bisogno uno che rimanga in vita per un po ', così posso periodicamente spingere più valori su di esso mentre li computo. – GWLlosa

+0

Dovresti davvero leggere "Intro to Rx", IObservable è una sequenza di eventi, quando l'Observer ottiene OnNext (valore T), ottieni l'ultimo valore con cui lavorare. – kondas

+1

Secondo i documenti, "Un ultimo metodo di produzione essenziale o costruttore primitivo si chiama Return. Il suo ruolo è quello di rappresentare una sequenza singleelement , proprio come un array di celle singole dovrebbe essere nel mondo delle sequenze IEnumerable.Il comportamento osservato gli osservatori sottoscritti sono due messaggi: OnNext con il valore e la segnalazione OnCompleted è stata ricevuta la fine della sequenza: IObservable source = Observable.Return (42); L'esecuzione di questo codice produce il seguente output: OnNext: 42 OnCompleted ".Non voglio il segnale completato, voglio che la sequenza rimanga aperta/attiva – GWLlosa

0

A meno che non mi imbatto in un modo migliore di farlo, quello che ho 'si è stabilito per ora è l'uso di un BlockingCollection.

var _itemsToSend = new BlockingCollection<Payload>(); 
IObservable<MessageQueuePayload> _deliverer = 
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default); 
Problemi correlati