2012-04-12 12 views
8

Come parte di applicazione (in produzione per circa 4 mesi) abbiamo un flusso di dati provenienti da un dispositivo esterno che si convertono ad un IObservablemetodo preferito per generare un IObservable <String> da un flusso

Fino ora abbiamo usato il seguente per generarlo, e ha funzionato abbastanza bene.

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    var streamReader = new StreamReader(inputStream); 
    return Observable 
      .Create<string>(observer => Scheduler.ThreadPool 
      .Schedule(() => ReadLoop(streamReader, observer))); 
} 

private void ReadLoop(StreamReader reader, IObserver<string> observer) 
{ 
    while (true) 
    { 
     try 
     { 
      var line = reader.ReadLine(); 
      if (line != null) 
      { 
       observer.OnNext(line); 
      } 
      else 
      { 
       observer.OnCompleted(); 
       break; 
      } 
     } 
     catch (Exception ex) 
     { 
      observer.OnError(ex); 
      break; 
     } 
    } 
} 

Ieri sera mi sono chiesto se ci fosse un modo per utilizzare la sintassi yield return per ottenere lo stesso risultato e si avvicinò con questo:

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    var streamReader = new StreamReader(inputStream); 
    return ReadLoop(streamReader) 
      .ToObservable(Scheduler.ThreadPool); 
} 

private IEnumerable<string> ReadLoop(StreamReader reader) 
{ 
    while (true) 
    { 
     var line = reader.ReadLine(); 
     if (line != null) 
     { 
      yield return line; 
     } 
     else 
     { 
      yield break; 
     } 
    } 
} 

Sembra funzionare abbastanza bene ed è molto più pulito, ma mi stavo chiedendo se ci fossero pro o contro per un modo rispetto all'altro, o se ci fosse un modo migliore del tutto.

+2

Pro: 'return' rendimento supporta lazy loading/fine della vostra collezione. –

+2

Con: quando viene generata un'eccezione non chiama OnException, si bolle solo su –

+0

Immagino che dipende se non ti dispiace la masterizzazione di un thread per eseguire il ciclo di lettura, che scende a quanti dispositivi è necessario supportare. Ho scritto un AsyncTextReader che era a sua volta Osservabile per fare qualcosa di simile, ma in scala. Sicuramente in questi giorni si potrebbe ATTENDERE qualcosa ... – piers7

risposta

11

Penso che ci sia una buona idea lì (trasformare Stream in Enumerable quindi IObservable). Tuttavia, il codice Enumberable può essere molto più pulito:

IEnumerable<string> ReadLines(Stream stream) 
{ 
    using (StreamReader reader = new StreamReader(stream)) 
    { 
     while (!reader.EndOfStream) 
      yield return reader.ReadLine(); 
    } 
} 

E poi osservabile:

IObservable<string> ObserveLines(Stream inputStream) 
{ 
    return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool); 
} 

Questo è più breve, più leggibile, e adeguatamente dismette i flussi. È anche pigro.

L'estensione ToObservable si occupa di rilevare gli eventi OnNext (nuove righe), nonché l'evento OnCompleted (fine di enumerable) e OnError.

+0

Bello, molto pulito. Dovrò provarlo domani. La mia unica preoccupazione è che io possa ottenere un valore finale null come ultimo elemento nell'Osservable, ma è facile filtrare con .Where (line => line! = Null) – baralong

2

Non ho il codice a portata di mano, ma ecco come farlo asincrono CTP pre-asincrono.

[Nota per scremare-lettori: non c'è bisogno di preoccuparsi se non c'è bisogno di scalare molto più]

Creare un'implementazione AsyncTextReader, che è di per sé osservabile. Il ctor acquisisce un flusso ed esegue una BeginRead (256byte) sul flusso, passandosi come continuazione, quindi ritornando.

Quando viene inserita la continuazione, chiamare EndRead e aggiungere i byte restituiti in un buffer piccolo della classe. Ripeti finché il buffer non contiene una o più sequenze di fine riga (come da TextWriter). Quando ciò accade, invia quei bit (s) del buffer fuori come una stringa tramite l'interfaccia Observable e ripeti.

Al termine, segnalare Completamento completo ecc. (E smaltire il flusso). Se ricevi un'eccezione generata da EndReadByte nella tua continuazione, prendi e rilascia l'interfaccia OnError.

codice chiamante appare quindi come:

IObservable = new AsyncTextReader (stream);

Questo bilancia bene. Basta fare in modo di non fare nulla di troppo stupido con la gestione del buffer.

pseudo codice:

public ctor(Stream stream){ 
    this._stream = stream; 
    BeginRead(); 
    return; 
} 

private void BeginRead(){ 
    // kick of async read and return (synchronously) 
    this._stream.BeginRead(_buffer,0,256,EndRead,this); 
} 

private void EndRead(IAsyncResult result){ 
    try{ 
     // bytesRead will be *up to* 256 
     var bytesRead = this._stream.EndRead(result); 
     if(bytesRead < 1){ 
      OnCompleted(); 
      return; 
     } 
     // do work with _buffer, _listOfBuffers 
     // to get lines out etc... 
     OnNext(aLineIFound); // times n 
     BeginRead(); // go round again 
    }catch(Exception err){ 
     OnException(err); 
    } 
} 

Ok, questo è l'APM, e qualcosa che solo una madre potrebbe amare. Attendo vivamente l'alternativa.

ps: se il lettore dovrebbe chiudere il flusso è una domanda interessante. Dico di no, perché non l'ha creato.

0

Con asincrone/attendono il supporto, il seguente è più probabile che la soluzione migliore:

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    return Observable.Using(() => new StreamReader(inputStream), 
     sr => Observable.Create<string>(async (obs, ct) => 
     { 
      while (true) 
      { 
       ct.ThrowIfCancellationRequested(); 
       var line = await sr.ReadLineAsync().ConfigureAwait(false); 
       if (line == null) 
        break; 
       obs.OnNext(line); 
      } 
      obs.OnCompleted(); 
    })); 
} 
Problemi correlati