2012-05-31 8 views
7

Ho un osservabile che rappresenta un flusso di prezzi delle azioni. Se non ci sono osservatori sulla mia sequenza osservabile mi piacerebbe essere in grado di disconnettersi dal server remoto che fornisce il flusso di prezzi, ma non voglio farlo finché ogni osservatore non ha chiamato Dispose(). Quindi, in modo simile, quando la prima persona chiama Iscriviti mi piacerebbe ricollegarmi al server remoto.Traccia il (numero di) osservatori in un Osservabile?

C'è un modo per capire quanti osservatori hanno chiamato iscriversi su un osservabile? O forse un modo per sapere quando gli osservatori chiamano Abbonati o Disponi?

risposta

3

IObservable<T> è un interface che è possibile implementare. Nel metodo Iscriviti dell'interfaccia è possibile tenere traccia degli osservatori mantenendo una lista internamente.

Lo snippet di codice seguente è di MSDN.

private List<IObserver<Location>> observers; 

public IDisposable Subscribe(IObserver<Location> observer) 
{ 
    if (! observers.Contains(observer)) 
     observers.Add(observer); 

    // ------- If observers.Count == 1 create connection. ------- 

    return new Unsubscriber(observers, observer); 
} 
private class Unsubscriber : IDisposable 
{ 
    private List<IObserver<Location>>_observers; 
    private IObserver<Location> _observer; 

    public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer) 
    { 
     this._observers = observers; 
     this._observer = observer; 
    } 

    public void Dispose() 
    { 
     if (_observer != null && _observers.Contains(_observer)) 
     _observers.Remove(_observer); 
     // ----------- if observers.Count == 0 close connection ----------- 
    } 
} 
+0

Sì, ho pensato che era così che avrei dovuto farlo. Speravo di poter sfruttare solo uno degli oggetti incorporati, ma sembra che dovrò completare uno di questi (molto probabilmente BehaviorSubject) in modo da poter tenere traccia degli abbonati. –

+0

Questa soluzione non fornisce alcuna sicurezza del thread. Avrà bisogno di un po 'di lavoro prima di andare in produzione. – Enigmativity

9

Vorrei semplicemente utilizzare RefCount/Publish. Ho sempre la sensazione che se sto implementando IObservable sto lavorando troppo duramente.

myColdObservable.Publish().RefCount(); 

Questo renderà la vostra sosta pulsare osservabile dopo che tutti si è disconnesso. Ecco un esempio:

var coldObservable = Observable 
    .Interval(TimeSpan.FromSeconds(1)) 
    .ObserveOn(Scheduler.TaskPool) 
    .Select(_ => DoSomething()); 

var refCountObs = coldObservable.Publish().RefCount(); 

CompositeDisposable d = new CompositeDisposable(); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n))); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n))); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n))); 

//Wait a bit for work to happen 
System.Threading.Thread.Sleep(10000); 

//Everyone unsubscribes 
d.Dispose(); 

//Observe that DoSomething is not called. 
System.Threading.Thread.Sleep(3000); 

questo non copre il caso in cui si vuole realmente conoscere il numero di di abbonati, ma penso che questo si adatta con le vostre esigenze di fermare il lavoro se non ci sono abbonati.

+0

Questo approccio non ti dà il numero di abbonati ma ferma la fonte osservabile quando tutti gli abbonati sono finiti. Molto meglio che implementare il tuo osservabile. – Enigmativity

+0

Questa è la migliore risposta –

3

In generale, non implementare IObservable; di solito c'è già qualcosa in Rx che può aiutarti, direttamente o attraverso la composizione. Se è necessario implementare IObservable, utilizzare Observable.Create per fare in modo di ottenere tutti i requisiti garantiti per il contratto dell'osservatore ecc.

Per quanto riguarda il problema, il suggerimento di utilizzare Publish e RefCount è esattamente la composizione stai cercando. Se si desidera contare per qualche motivo, utilizzare Observable.Defer per intercettare le sottoscrizioni, possibilmente con Observable.Finally per intercettare le interruzioni di sequenza. In alternativa, avvolgere la fonte con Observable.Create, inoltrare l'osservatore alla sequenza spostata e avvolgere l'ID restituito con logica di conteggio (utilizzando Disposable.Create).

Cheers,

-Bart (squadra Rx)

4

po 'di un vecchio, ma mi sono imbattuto in questo post come ho avuto un problema in cui avevo bisogno di sapere il numero di abbonati. Usando il suggerimento di Bart, ho avuto questa estensione.

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged) 
{ 
int count = 0; 

return Observable.Defer(() => 
{ 
    count = Interlocked.Increment(ref count); 
    countChanged(count); 
    return source.Finally(() => 
    { 
     count = Interlocked.Decrement(ref count); 
     countChanged(count); 
    }); 
}); 
} 
Problemi correlati