Le estensioni reattive sono dotate di molti metodi di supporto per trasformare eventi esistenti e operazioni asincrone in oggetti osservabili, ma come implementare un IObservable <T> da zero?Implementazione IObservable <T> da zero
IEnumerable ha la parola chiave yield interessante per renderla molto semplice da implementare.
Qual è il modo corretto di implementare IObservable <T>?
Devo preoccuparmi della sicurezza del filo?
So che esiste un supporto per ottenere richiamato su un contesto di sincronizzazione specifico ma è qualcosa che io come IObservable <T> autore di cui preoccuparsi o questo in qualche modo integrato?
aggiornamento:
Ecco la mia C# versione della soluzione di Brian F #
using System;
using System.Linq;
using Microsoft.FSharp.Collections;
namespace Jesperll
{
class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
{
private FSharpMap<int, IObserver<T>> subscribers =
FSharpMap<int, IObserver<T>>.Empty;
private readonly object thisLock = new object();
private int key;
private bool isDisposed;
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
OnCompleted();
isDisposed = true;
}
}
protected void OnNext(T value)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnNext(value);
}
}
protected void OnError(Exception exception)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
if (exception == null)
{
throw new ArgumentNullException("exception");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnError(exception);
}
}
protected void OnCompleted()
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (thisLock)
{
int k = key++;
subscribers = subscribers.Add(k, observer);
return new AnonymousDisposable(() =>
{
lock (thisLock)
{
subscribers = subscribers.Remove(k);
}
});
}
}
}
class AnonymousDisposable : IDisposable
{
Action dispose;
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}
public void Dispose()
{
dispose();
}
}
}
edit: Non gettare ObjectDisposedException se Dispose viene chiamato due volte
Wes Dyer ora ha un video su Channel9 che parla dei contratti per queste interfacce. – Benjol
(anni 30 più tardi ... http://channel9.msdn.com/posts/J.Van.Gogh/Reactive-Extensions-API-in-depth-Contract/) – Benjol
Cool - sarà sicuro di guardarlo :) –