2009-11-20 13 views
37

Le estensioni reattive sono dotate di molti metodi di supporto per trasformare eventi esistenti e operazioni asincrone in oggetti osservabili, ma come implementare un IObservable <T> da zero?Implementazione IObservable <T> da zero

IEnumerable ha la parola chiave yield interessante per renderla molto semplice da implementare.

Qual è il modo corretto di implementare IObservable <T>?

Devo preoccuparmi della sicurezza del filo?

So che esiste un supporto per ottenere richiamato su un contesto di sincronizzazione specifico ma è qualcosa che io come IObservable <T> autore di cui preoccuparsi o questo in qualche modo integrato?

aggiornamento:

Ecco la mia C# versione della soluzione di Brian F #

using System; 
using System.Linq; 
using Microsoft.FSharp.Collections; 

namespace Jesperll 
{ 
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs 
    { 
     private FSharpMap<int, IObserver<T>> subscribers = 
       FSharpMap<int, IObserver<T>>.Empty; 
     private readonly object thisLock = new object(); 
     private int key; 
     private bool isDisposed; 

     public void Dispose() 
     { 
      Dispose(true); 
     } 

     protected virtual void Dispose(bool disposing) 
     { 
      if (disposing && !isDisposed) 
      { 
       OnCompleted(); 
       isDisposed = true; 
      } 
     } 

     protected void OnNext(T value) 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnNext(value); 
      } 
     } 

     protected void OnError(Exception exception) 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      if (exception == null) 
      { 
       throw new ArgumentNullException("exception"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnError(exception); 
      } 
     } 

     protected void OnCompleted() 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnCompleted(); 
      } 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      if (observer == null) 
      { 
       throw new ArgumentNullException("observer"); 
      } 

      lock (thisLock) 
      { 
       int k = key++; 
       subscribers = subscribers.Add(k, observer); 
       return new AnonymousDisposable(() => 
       { 
        lock (thisLock) 
        { 
         subscribers = subscribers.Remove(k); 
        } 
       }); 
      } 
     } 
    } 

    class AnonymousDisposable : IDisposable 
    { 
     Action dispose; 
     public AnonymousDisposable(Action dispose) 
     { 
      this.dispose = dispose; 
     } 

     public void Dispose() 
     { 
      dispose(); 
     } 
    } 
} 

edit: Non gettare ObjectDisposedException se Dispose viene chiamato due volte

+1

Wes Dyer ora ha un video su Channel9 che parla dei contratti per queste interfacce. – Benjol

+1

(anni 30 più tardi ... http://channel9.msdn.com/posts/J.Van.Gogh/Reactive-Extensions-API-in-depth-Contract/) – Benjol

+0

Cool - sarà sicuro di guardarlo :) –

risposta

10

Onestamente, non sono sicuro di come sia "giusto" tutto questo, ma se si sente abbastanza bene in base alla mia esperienza fino ad ora. È il codice F #, ma si spera che tu abbia un'idea del sapore. Ti consente di "rinnovare" un oggetto sorgente, che puoi quindi chiamare Successivo/Completato/Errore attivo, e gestisce le sottoscrizioni e cerca di fare assegnamento quando l'origine oi client fanno cose cattive.

type ObservableSource<'T>() =  // ' 
    let protect f = 
     let mutable ok = false 
     try 
      f() 
      ok <- true 
     finally 
      Debug.Assert(ok, "IObserver methods must not throw!") 
      // TODO crash? 
    let mutable key = 0 
    // Why a Map and not a Dictionary? Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over 
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>> // ' 
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnNext(x))) 
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnCompleted())) 
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnError(e))) 
    let thisLock = new obj() 
    let obs = 
     { new IObservable<'T> with  // ' 
      member this.Subscribe(o) = 
       let k = 
        lock thisLock (fun() -> 
         let k = key 
         key <- key + 1 
         subscriptions <- subscriptions.Add(k, o) 
         k) 
       { new IDisposable with 
        member this.Dispose() = 
         lock thisLock (fun() -> 
          subscriptions <- subscriptions.Remove(k)) } } 
    let mutable finished = false 
    // The methods below are not thread-safe; the source ought not call these methods concurrently 
    member this.Next(x) = 
     Debug.Assert(not finished, "IObserver is already finished") 
     next x 
    member this.Completed() = 
     Debug.Assert(not finished, "IObserver is already finished") 
     finished <- true 
     completed() 
    member this.Error(e) = 
     Debug.Assert(not finished, "IObserver is already finished") 
     finished <- true 
     error e 
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads 
    member this.Value = obs 

Sarò interessato a qualsiasi pensiero su ciò che è buono o cattivo qui; Non ho avuto la possibilità di guardare tutte le cose nuove Rx da ancora devlabs ...

Le mie esperienze suggeriscono che:

  • Coloro che sottoscrivono osservabili non dovrebbe mai buttare dagli abbonamenti. Non c'è nulla di ragionevole che un osservabile possa fare quando un abbonato lancia. (Questo è simile agli eventi.Molto probabilmente l'eccezione diventerà solo un gestore catch-all di primo livello o bloccherà l'app.
  • Le fonti probabilmente dovrebbero essere "logicamente single threaded". Penso che potrebbe essere più difficile scrivere client in grado di reagire alle chiamate OnNext simultanee; anche se ogni singola chiamata proviene da un thread diverso, è utile evitare chiamate contemporanee.
  • È decisamente utile avere una classe base/helper che imponga alcuni "contratti".

Sono molto curioso se le persone possono mostrare più consigli concreti lungo queste linee.

+1

Grazie, ho avuto una crepa nella creazione di qualcosa di simile in C# e ho finito per usare la raccolta F # Map per evitare il blocco durante l'enumerazione. Un'altra opzione è usare qualcosa come Immutable AVLTree di Eric Lippert. Mi sono convinto che è responsabilità dell'osservatore assicurarsi che gli eventi siano ricevuti nel giusto contesto e l'osservabile dovrebbe limitarsi ad aumentare eventi sullo stesso thread ogni volta (come si scrive). –

2
  1. Crack aperto Riflettore e dai un'occhiata.

  2. guardare alcuni video C9 - this una mostra come si puo 'derivare' Select 'combinatore'

  3. Il segreto è quello di creare classi AnonymousObservable, AnonymousObserver e AnonymousDisposable, (che sono solo di lavoro arounds per il fatto che non puoi istanziare interfacce). Contengono l'implementazione zero, mentre la passate con Actions e Func.

Ad esempio:

public class AnonymousObservable<T> : IObservable<T> 
{ 
    private Func<IObserver<T>, IDisposable> _subscribe; 
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) 
    { 
     _subscribe = subscribe; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     return _subscribe(observer); 
    } 
} 

ti farò lavorare il resto ... è un ottimo esercizio di comprensione.

C'è un bel thread in crescita here con domande correlate.

+1

Grazie, ma non così tanto utile. Ho già visitato sia il riflettore che la maggior parte dei video di C9. Reflector mostra solo l'effettiva implementazione ed è molto difficile dedurre regole riguardanti il ​​threading e simili da esso. Anche il tuo cosiddetto segreto spinge semplicemente la responsabilità di un'implementazione corretta dalla classe osservabile reale al Func fornito - non rivela le regole per implementare quel Func. Quindi in pratica non mi hai detto nulla se non di capire il resto da solo :) –

+1

Punto preso.Per essere onesti, la maggior parte dei miei sforzi finora hanno cercato di scrivere quelli che chiamano "combinatori" invece che fonti reali. Puoi trovare alcune linee guida dalle risposte alla mia domanda qui (il posto migliore per ottenere risposte "ufficiali" al momento): http://social.msdn.microsoft.com/Forums/en-US/rx/thread/79402dd3 -009a-46db-9b55-06482e8cad0e – Benjol

2

solo un'osservazione riguardo a questa implementazione:

dopo collezioni concorrenti in fase di introduzione in .net fw 4 è probabilmente meglio utilizzare ConcurrentDictioary invece di un semplice dizionario.

salva le serrature di movimentazione sulla raccolta.

adi.

6

Sì, la parola chiave yield è bella; forse ci sarà qualcosa di simile per IObservable (OfT)? [Edit:. In di PDC '09 talk Eric Meijer, dice "sì, guardi questo spazio" per una resa dichiarativa per la generazione di osservabili]

Per qualcosa di simile (invece di tirare il proprio), check out the bottom del "(not yet) 101 Rx Samples" Wiki, dove il team suggerisce l'uso della classe Subject (T) come "backend" per implementare un IObservable (OfT). Ecco il loro esempio:

public class Order 
{    
    private DateTime? _paidDate; 

    private readonly Subject<Order> _paidSubj = new Subject<Order>(); 
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } } 

    public void MarkPaid(DateTime paidDate) 
    { 
     _paidDate = paidDate;     
     _paidSubj.OnNext(this); // Raise PAID event 
    } 
} 

private static void Main() 
{ 
    var order = new Order(); 
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe 

    order.MarkPaid(DateTime.Now); 
} 
+0

IMHO 'Subject' è sicuramente il modo giusto per andare quando vuoi generare i tuoi osservabili. –

+2

Solo una nota a margine, AsyncSubject è una scelta migliore qui, perché mantiene l'ultimo valore per i futuri abbonati. Nel tuo esempio è necessario iscriversi prima che si verifichi l'evento di pagamento vero e proprio. – Nappy

+0

@Nappy: non sapevo di 'AsyncSubject ' -grazie per averlo menzionato. –

11

Il official documentation depreca gli utenti di attuazione IObservable stessi. Invece, gli utenti devono utilizzare il metodo factory

Quando possibile, implementare nuovi operatori componendo operatori esistenti. In caso contrario, implementare operatori personalizzati utilizzando Observable.Create

Succede che Observable.Create è un wrapper banale attorno classe interna del Reattivo AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) 
{ 
    if (subscribe == null) 
    { 
     throw new ArgumentNullException("subscribe"); 
    } 
    return new AnonymousObservable<TSource>(subscribe); 
} 

Non so perché non hanno fatto la loro attuazione pubblico, ma hey, qualunque cosa.

+0

corretto. Non implementare 'IObservable ' o 'IObserver '. –

+0

Ciao Lee. Adoro il tuo libro-ex-blog su RX, una guida molto migliore rispetto ai documenti ufficiali. –

+1

Cheers. Poiché ora Rx è stato open source, spero di essere in grado di aiutare il team a aggiornare il codice/i documenti ufficiali. –

Problemi correlati