2013-07-04 12 views
7

Ho un metodo di async che è un metodo di lunga durata che legge un ruscello e quando trova qualcosa genera un evento:Conversione di un metodo asincrono per tornare IObservable <>

public static async void GetStream(int id, CancellationToken token) 

Si prende un gettone cancellazione perché è creato in un nuovo compito. Internamente chiama await quando legge un flusso:

var result = await sr.ReadLineAsync() 

Ora, voglio convertire questo a un metodo che restituisce un IObservable <> in modo che posso usare questo con le estensioni reattivi. Da quello che ho letto, il modo migliore per farlo è usare Observable.Create, e dal momento che RX 2.0 ora supporta anche asincrona posso ottenere tutto per lavorare con qualcosa di simile:

public static IObservable<Message> ObservableStream(int id, CancellationToken token) 
{ 
    return Observable.Create<Message>(
     async (IObserver<Message> observer) => 
      { 

Il resto del codice all'interno è lo stesso, ma invece di lanciare eventi, sto chiamando observer.OnNext(). Ma questo sembra sbagliato. Per prima cosa sto mixando CancellationTokens e, sebbene l'aggiunta della parola chiave async abbia funzionato, è questa la cosa migliore da fare? Chiamo il mio ObservableStream in questo modo:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m)); 
+1

si dovrebbe quasi mai usare 'asincrona void', certamente nei metodi di libreria. – svick

risposta

1

si dovrebbe cambiare GetStream per restituire un compito, invece di void (ritorno asincrona vuoto non è buona, tranne quando assolutamente necessario, come svick commentato). Una volta restituita un'attività, puoi semplicemente chiamare. ToObservable() e il gioco è fatto.

Ad esempio:

public static async Task<int> GetStream(int id, CancellationToken token) { ... } 

Poi,

GetStream(1, new CancellationToken(false)) 
    .ToObservable() 
    .Subscribe(Console.Write); 
+0

Penso che 'GetStream() 'spari un evento più volte, quindi' Task 'non sarebbe sufficiente. – svick

+0

Sì, la firma era un esempio, ma nella modifica, sto implicando di eliminare GetStream direttamente usando ToObservable(). Sto aggiornando la mia risposta con alcuni chiarimenti. –

+0

Ah, giusto, grazie. Sì, GetStream viene eseguito per sempre e genera un evento quando trova un messaggio. Quindi, se usassi 'Task ', avrebbe senso? Vorrei abbandonare l'evento sparando lì dentro per qualcos'altro? –

11

Lei ha ragione. Una volta che si rappresenta l'interfaccia tramite IObservable, è necessario evitare di richiedere ai chiamanti di fornire uno CancellationToken. Ciò non significa che non puoi usarli internamente. Rx fornisce diversi meccanismi per produrre le istanze CancellationToken che vengono cancellate quando l'osservatore si annulla dal tuo osservabile.

Ci sono diversi modi per affrontare il problema. Il più semplice non richiede quasi modifiche nel codice. Esso utilizza un sovraccarico di Observable.Create cui si fornisce una CancellationToken che fa scattare, se le si cancella chiamante: non

public static IObservable<Message> ObservableStream(int id) 
{ 
    return Observable.Create<Message>(async (observer, token) => 
    { 
     // no exception handling required. If this method throws, 
     // Rx will catch it and call observer.OnError() for us. 
     using (var stream = /*...open your stream...*/) 
     { 
      string msg; 
      while ((msg = await stream.ReadLineAsync()) != null) 
      { 
       if (token.IsCancellationRequested) { return; } 
       observer.OnNext(msg); 
      } 
      observer.OnCompleted(); 
     } 
    }); 
} 
+0

Non dovrebbe 'if (token.IsCancellationRequested) {return; } 'be' if (token.IsCancellationRequested) {break; } 'così viene chiamato' OnCompleted'? – TiMoch

+0

@TiMoch nessun osservatore ha già annullato l'iscrizione in questo caso non è necessario inviare loro eventuali notifiche aggiuntive – Brandon

Problemi correlati