2011-09-06 10 views
20

Quello che voglio fare è assicurarsi che se l'unico riferimento al mio osservatore è l'osservabile, esso viene raccolto e smesso di ricevere messaggi.Creazione di un abbonamento debole a un IObassable

Dire Ho un controllo con una casella di riepilogo su di esso chiamato Messaggi e questo codice dietro:

//Short lived display of messages (only while the user's viewing incoming messages) 
public partial class MessageDisplay : UserControl 
{ 
    public MessageDisplay() 
    { 
     InitializeComponent(); 
     MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m)); 
    } 
} 

Che si connette a questa fonte:

//Long lived location for message store 
static class MySource 
{ 
    public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>; 
} 

Quello che non voglio è per mantenere il messaggio visualizzato in memoria molto tempo dopo che non è più visibile. Idealmente mi piacerebbe un po 'di estensione in modo da poter scrivere:

MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m)); 

anche io non voglio fare affidamento sul fatto che MessageDisplay è un controllo utente come io poi vogliono andare per una configurazione con MVVM MessageDisplayViewModel che non sarà un controllo dell'utente.

+1

Avete qualche posto nel codice quando si sa che non lo fai vuoi più l'Observable?In tal caso, è possibile prendere il 'IDisposable' restituito dal metodo' Subscribe' per sbarazzarsi di esso quando è necessario. – lbergnehr

+0

@seldon Potrei usarlo per questo specifico esempio, solo ogni volta che la finestra del messaggio è chiusa, ma voglio un approccio molto più generale così posso usare questa funzionalità in modo più ampio e impedire ad altri programmatori che usano le mie librerie di dimenticare qualcosa da qualche parte . Ho visto qualcosa di correlato in MVVMLightToolkit, ma non per IObservable e non capisco davvero come funzioni e queste cose sono notoriamente difficili da ottenere. – ForbesLindesay

+0

Si potrebbe fare riferimento alla classe 'WeakReference', che può essere utilizzata per fare in modo che le istanze vengano raccolte dal garbage collector anche se sono 'referenziate'. Tuttavia, ci sono molti operatori nelle estensioni reattive che si occupano di smaltire l'osservabile a un certo punto. Se hai qualche evento o qualcosa del genere quando sai che hai finito con l'osservabile, forse quelli dovrebbero bastare? – lbergnehr

risposta

0

Il codice di seguito si ispira originale di DTB inviare. L'unico cambiamento è che restituisce un riferimento all'osservatore come parte dell'IDisposable. Ciò significa che il riferimento all'IObserver verrà mantenuto attivo finché si mantiene un riferimento all'IDisposable che si ottiene alla fine della catena (presupponendo che tutti i prodotti usa e getta mantengano un riferimento al disposable prima di essi). Ciò consente l'utilizzo dei metodi di estensione come Subscribe(M=>DoSomethingWithM(M)) perché manteniamo un riferimento a IObserver implicitamente costruito ma non manteniamo un riferimento forte dall'origine al server IOb (che produce un porro di memoria).

using System.Reactive.Linq; 

static class WeakObservation 
{ 
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> observable) 
    { 
     return Observable.Create<T>(observer => 
      (IDisposable)new DisposableReference(new WeakObserver<T>(observable, observer), observer) 
      ); 
    } 
} 

class DisposableReference : IDisposable 
{ 
    public DisposableReference(IDisposable InnerDisposable, object Reference) 
    { 
     this.InnerDisposable = InnerDisposable; 
     this.Reference = Reference; 
    } 

    private IDisposable InnerDisposable; 
    private object Reference; 

    public void Dispose() 
    { 
     InnerDisposable.Dispose(); 
     Reference = null; 
    } 
} 

class WeakObserver<T> : IObserver<T>, IDisposable 
{ 
    private readonly WeakReference reference; 
    private readonly IDisposable subscription; 
    private bool disposed; 

    public WeakObserver(IObservable<T> observable, IObserver<T> observer) 
    { 
     this.reference = new WeakReference(observer); 
     this.subscription = observable.Subscribe(this); 
    } 

    public void OnCompleted() 
    { 
     var observer = (IObserver<T>)this.reference.Target; 
     if (observer != null) observer.OnCompleted(); 
     else this.Dispose(); 
    } 

    public void OnError(Exception error) 
    { 
     var observer = (IObserver<T>)this.reference.Target; 
     if (observer != null) observer.OnError(error); 
     else this.Dispose(); 
    } 

    public void OnNext(T value) 
    { 
     var observer = (IObserver<T>)this.reference.Target; 
     if (observer != null) observer.OnNext(value); 
     else this.Dispose(); 
    } 

    public void Dispose() 
    { 
     if (!this.disposed) 
     { 
      this.disposed = true; 
      this.subscription.Dispose(); 
     } 
    } 
} 
+1

Ho appena testato questo codice e ancora perde. Io suggerisco fortemente di provare a farlo. Ci sono problemi in tutta questa linea di pensiero. 1) Oggetto Static Replay - non rilascerà mai la cache 2) Se non si implementa il pattern Dispose per Rx, che altro non si rilascia? - Gestori di eventi, connessioni IO? 3) L'utente non può disporre in modo deterministico delle proprie risorse 4) il codice in realtà non funziona 5) meno è di più. Hai più codice che ingannare altri programmatori a pensare che questo codice funzioni, dove non funziona e crea solo 100 linee di codice rumore. ** Si prega di non fare questo ** –

+0

@Lee In risposta a 1, 2, 3: lo scenario di esempio in cui si potrebbe usare questo è stato perso, NON lo si utilizzerà in una situazione in cui è necessario assicurarsi che l'origine venga eliminata . Si tratta di garantire che l'ascoltatore venga eliminato. È appropriato se il tuo Osservabile sarà comunque necessario per l'intera vita dell'applicazione, ma il tuo osservatore dovrebbe essere disposto se l'unica cosa che fa riferimento è quella osservabile. Questo praticamente risponde 4 - cioè funziona per assicurarsi che l'osservatore non sia trapelato – ForbesLindesay

+0

@Lee per rispondere a 5: È un sacco di codice, e fa qualcosa che a prima vista sembra piuttosto semplice, ma si rivela essere relativamente complesso. La ragione per cui è giustificato scrivere questo grosso pezzo di codice è che è altamente riutilizzabile. Questo funzionerà ogni volta che ti trovi nella situazione di cui sopra. Può essere esposto come API con una documentazione appropriata per ciò che fa un osservabile debole e potrebbe quindi essere utilizzato senza dover capire come funziona. – ForbesLindesay

12

È possibile sottoscrivere un osservatore proxy per l'osservabile che contiene una debolezza di riferimento per l'osservatore reale e dispone la sottoscrizione quando l'osservatore reale non è più in vita:

static IDisposable WeakSubscribe<T>(
    this IObservable<T> observable, IObserver<T> observer) 
{ 
    return new WeakSubscription<T>(observable, observer); 
} 

class WeakSubscription<T> : IDisposable, IObserver<T> 
{ 
    private readonly WeakReference reference; 
    private readonly IDisposable subscription; 
    private bool disposed; 

    public WeakSubscription(IObservable<T> observable, IObserver<T> observer) 
    { 
     this.reference = new WeakReference(observer); 
     this.subscription = observable.Subscribe(this); 
    } 

    void IObserver<T>.OnCompleted() 
    { 
     var observer = (IObserver<T>)this.reference.Target; 
     if (observer != null) observer.OnCompleted(); 
     else this.Dispose(); 
    } 

    void IObserver<T>.OnError(Exception error) 
    { 
     var observer = (IObserver<T>)this.reference.Target; 
     if (observer != null) observer.OnError(error); 
     else this.Dispose(); 
    } 

    void IObserver<T>.OnNext(T value) 
    { 
     var observer = (IObserver<T>)this.reference.Target; 
     if (observer != null) observer.OnNext(value); 
     else this.Dispose(); 
    } 

    public void Dispose() 
    { 
     if (!this.disposed) 
     { 
      this.disposed = true; 
      this.subscription.Dispose(); 
     } 
    } 
} 
+0

+1 questo esattamente il tipo di cosa che speravo. Una rapida domanda è che funzionerà ancora con i metodi di estensione per Subscribe (M => DoSomethingWithM (M)), o potrebbero ottenere la raccolta dei dati obsoleti in anticipo? Come estensione di questo, importa se questa è l'ultima cosa nella tua query o puoi eseguire anche la proiezione/query ecc.? – ForbesLindesay

+1

È necessario eseguire proiezioni e filtri prima di questo. Subscribe (M => DoSomethingWithM (M)) crea internamente un IObserver che avvolge il delegato M => DoSomethingWithM (M). È necessario mantenere attivo l'IObserver interno , che non è possibile, perché è interno. Quindi, dopo averci pensato, in realtà non consiglierei più la mia risposta. Cerca un approccio diverso. – dtb

+0

Esiste un requisito che l'IDisposable restituito quando la sottoscrizione di un evento consenta lo smaltimento asincrono? Sembrerebbe che dovrebbe essere abbastanza semplice implementare un tale comportamento facendogli contenere il riferimento all'IObserver e facendogli deselezionare quel riferimento su Dispose anche se il contesto di threading non gli consentisse di fare nient'altro; se ciò fosse fatto, l'IObservable potrebbe evitare perdite di memoria eliminando periodicamente la sua lista di abbonati Abbonati. Esaminare un utente ogni volta che un abbonamento viene aggiunto o trovato è stato rimosso sarebbe sufficiente. – supercat

2

eseguito attraverso questo filo un paio di anni dopo ... solo voluto punto in avanti per la soluzione individuata sulla Samuel Jack's blog che aggiunge un metodo di estensione per IObservable chiamato WeaklySubscribe. Usa un approccio per aggiungere uno spessore tra il soggetto e l'osservatore che traccia il bersaglio con un WeakReference. È simile alle soluzioni offerte da altri per il problema dei riferimenti forti negli abbonamenti di eventi, ad esempio in this article o this solution by Paul Stovell. Avendo per un po 'usato qualcosa basato sull'approccio di Paul, mi piace la soluzione di Samuel per gli abbonamenti IObservable deboli.

1

questa è la mia implementazione (quit semplice)

public class WeakObservable<T>: IObservable<T> 
{ 
    private IObservable<T> _source; 

    public WeakObservable(IObservable<T> source) 
    { 
     #region Validation 

     if (source == null) 
      throw new ArgumentNullException("source"); 

     #endregion Validation 

     _source = source; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     IObservable<T> source = _source; 
     if(source == null) 
      return Disposable.Empty; 
     var weakObserver = new WaekObserver<T>(observer); 
     IDisposable disp = source.Subscribe(weakObserver); 
     return disp; 
    } 
} 
    public class WaekObserver<T>: IObserver<T> 
{ 
    private WeakReference<IObserver<T>> _target; 

    public WaekObserver(IObserver<T> target) 
    { 
     #region Validation 

     if (target == null) 
      throw new ArgumentNullException("target"); 

     #endregion Validation 

     _target = new WeakReference<IObserver<T>>(target); 
    } 

    private IObserver<T> Target 
    { 
     get 
     { 
      IObserver<T> target; 
      if(_target.TryGetTarget(out target)) 
       return target; 
      return null; 
     } 
    } 

    #region IObserver<T> Members 

    /// <summary> 
    /// Notifies the observer that the provider has finished sending push-based notifications. 
    /// </summary> 
    public void OnCompleted() 
    { 
     IObserver<T> target = Target; 
     if (target == null) 
      return; 

     target.OnCompleted(); 
    } 

    /// <summary> 
    /// Notifies the observer that the provider has experienced an error condition. 
    /// </summary> 
    /// <param name="error">An object that provides additional information about the error.</param> 
    public void OnError(Exception error) 
    { 
     IObserver<T> target = Target; 
     if (target == null) 
      return; 

     target.OnError(error); 
    } 

    /// <summary> 
    /// Provides the observer with new data. 
    /// </summary> 
    /// <param name="value">The current notification information.</param> 
    public void OnNext(T value) 
    { 
     IObserver<T> target = Target; 
     if (target == null) 
      return; 

     target.OnNext(value); 
    } 

    #endregion IObserver<T> Members 
} 
    public static class RxExtensions 
{ 
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> source) 
    { 
     return new WeakObservable<T>(source); 
    } 
} 
     static void Main(string[] args) 
    { 
     Console.WriteLine("Start"); 
     var xs = Observable.Interval(TimeSpan.FromSeconds(1)); 
     Sbscribe(xs); 

     Thread.Sleep(2020); 
     Console.WriteLine("Collect"); 
     GC.Collect(); 
     GC.WaitForPendingFinalizers(); 
     GC.Collect(); 
     Console.WriteLine("Done"); 
     Console.ReadKey(); 
    } 

    private static void Sbscribe<T>(IObservable<T> source) 
    { 
     source.ToWeakObservable().Subscribe(v => Console.WriteLine(v)); 
    } 
0

La chiave è quello di riconoscere che si sta andando ad avere per passare sia il bersaglio e un'azione di due parametri. Un'azione a parametro unico non lo farà mai, perché o usi un riferimento debole alla tua azione (e l'azione ottiene GC'd), o usi un forte riferimento alla tua azione, che a sua volta ha un forte riferimento al bersaglio , quindi l'obiettivo non può ottenere GC'd. Tenendo questo in mente, i seguenti lavori:

using System; 

namespace Closures { 
    public static class WeakReferenceExtensions { 
    /// <summary> returns null if target is not available. Safe to call, even if the reference is null. </summary> 
    public static TTarget TryGetTarget<TTarget>(this WeakReference<TTarget> reference) where TTarget : class { 
     TTarget r = null; 
     if (reference != null) { 
     reference.TryGetTarget(out r); 
     } 
     return r; 
    } 
    } 
    public static class ObservableExtensions { 

    public static IDisposable WeakSubscribe<T, U>(this IObservable<U> source, T target, Action<T, U> action) 
     where T : class { 
     var weakRef = new WeakReference<T>(target); 
     var r = source.Subscribe(u => { 
     var t = weakRef.TryGetTarget(); 
     if (t != null) { 
      action(t, u); 
     } 
     }); 
     return r; 
    } 
    } 
} 

Esempio osservabili:

using System; 
using System.Reactive.Subjects; 

namespace Closures { 
    public class Observable { 
    public IObservable<int> ObservableProperty => _subject; 
    private Subject<int> _subject = new Subject<int>(); 
    private int n; 
    public void Fire() { 
     _subject.OnNext(n++); 
    } 
    } 
} 

Usage:

Class SomeClass { 

IDisposable disposable; 

public void SomeMethod(Observable observeMe) { 
    disposable = observeMe.ObservableProperty.WeakSubscribe(this, (wo, n) => wo.Log(n)); 
} 

    public void Log(int n) { 
    System.Diagnostics.Debug.WriteLine("log "+n); 
    } 
} 
Problemi correlati