2013-01-22 10 views
5

Sto faticando qui. Normalmente avrei letto un libro ma non ce ne sono ancora. Ho trovato innumerevoli esempi di varie cose da fare con la lettura di stream usando RX ma trovo molto difficile riuscire a capirlo.Qual è il modo corretto di creare un Observable che legge uno stream fino alla fine

So che posso utilizzare Observable.FromAsyncPattern per creare un wrapper dei metodi BeginRead/EndRead o BeginReadLine/EndReadLine di Stream.

Ma questo si legge solo una volta - quando il primo osservatore sottoscrive.

Desidero un Observable che continui a leggere e a pompare OnNext finché il flusso non si interrompe.

Oltre a questo, mi piacerebbe anche sapere come posso condividere ciò osservabile con più abbonati in modo che tutti ottengano gli elementi.

+3

Grande articolo, ma massicciamente obsoleto. Questo fa riferimento alla vecchia API. Questo capitolo * prende quei concetti e lo fa funzionare con la versione più recente di Rx. * http: //introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator –

+0

Ooh, non ho mai saputo di quel collegamento @LeeCampbell - molto bello! – JerKimball

+0

Cosa diavolo è successo a questa domanda? Dove sono le risposte precedenti? –

risposta

1

Il la soluzione è quella di utilizzare Observable.Create

Ecco un esempio che può essere adattato per ottenere la lettura di qualsiasi tipo di flusso

public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader) 
    { 

     return Observable.Create<Command>(async (subject, token) => 
     { 


      try 
      { 

       while (true) 
       { 

        if (token.IsCancellationRequested) 
        { 
         subject.OnCompleted(); 
         return; 
        } 

        //this part here can be changed to something like this 
        //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive); 

        Command cmd = await reader.ReadCommandAsync(); 

        subject.OnNext(cmd); 

       } 

      } 

      catch (Exception ex) 
      { 
       try 
       { 
        subject.OnError(ex); 
       } 
       catch (Exception) 
       { 
        Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere"); 
        throw; 
       } 
      } 

     }).Publish(); 

    } 

Non dimenticare di chiamare Connect() sul IConnectableObservable restituita

3

aggiunta alla risposta di Lee, utilizzando rxx:

using (new FileStream(@"filename.txt", FileMode.Open) 
     .ReadToEndObservable() 
     .Subscribe(x => Console.WriteLine(x.Length))) 
{ 
    Console.ReadKey(); 
} 

verrà emesso La lunghezza dei buffer di lettura.

+0

Dov'è la risposta di Lee? –

+0

Scomparsa per qualche motivo –

+0

La sua risposta è stata cancellata perché in realtà non forniva una risposta alla domanda - solo un collegamento a un [suo] libro –

1

Eh - andando riutilizzo uno dei miei altre risposte qui (o meglio, una parte di esso, comunque):

Rif: Reading from NetworkStream corrupts the buffer

In questo, ho un metodo di estensione in questo modo:

public static class Ext 
{   
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize) 
    {   
     // to hold read data 
     var buffer = new byte[bufferSize]; 
     // Step 1: async signature => observable factory 
     var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
      stream.BeginRead, 
      stream.EndRead); 
     return Observable.While(
      // while there is data to be read 
      () => stream.CanRead, 
      // iteratively invoke the observable factory, which will 
      // "recreate" it such that it will start from the current 
      // stream position - hence "0" for offset 
      Observable.Defer(() => asyncRead(buffer, 0, bufferSize)) 
       .Select(readBytes => buffer.Take(readBytes).ToArray())); 
    } 
} 

probabilmente si può utilizzare questo come scritto in forma in questo modo:

// Note: ToEnumerable works here because your filestream 
// has a finite length - don't do this with infinite streams! 
var blobboData = stream 
    .ReadObservable(bufferSize) 
    // take while we're still reading data 
    .TakeWhile(returnBuffer => returnBuffer.Length > 0) 
    .ToEnumerable() 
    // mash them all together 
    .SelectMany(buffer => buffer) 
    .ToArray(); 
+0

Ciao JerKimball, grazie per la soluzione. L'ho usato, quindi ho avuto condizioni di errore nel gestire il tempo. Come gestisci quando devi chiudere la connessione? Ho aperto la domanda http://stackoverflow.com/questions/32079292/reading-from-stream-using-observable-through-fromasyncpattern-how-to-close-canc riguardo a questo se è possibile verificarlo. –

+0

Rxx sembra farlo in modo molto diverso. Qualcuno può elaborare? https://github.com/RxDave/Rxx/blob/master/Main/Source/Rxx.Bindings-Net451/System/IO/StreamExtensions.cs –

+0

C'è un trucco con questa implementazione. Diversi utenti riceveranno diverse condivisioni dei dati del flusso! Si deve aggiungere un '.Publish(). Connect()' per garantire che tutti gli abbonati ottengano tutti i dati. –

1

È possibile utilizzare Repeat per mantenere le righe di lettura fino alla fine dello stream e Publish o Replay per controllare la condivisione tra più lettori.

Un esempio di una soluzione semplice e completa Rx per leggere linee da qualsiasi flusso fino alla fine sarà:

public static IObservable<string> ReadLines(Stream stream) 
{ 
    return Observable.Using(
     () => new StreamReader(stream), 
     reader => Observable.FromAsync(reader.ReadLineAsync) 
          .Repeat() 
          .TakeWhile(line => line != null)); 
} 

Questa soluzione sfrutta anche il fatto che ReadLine restituisce null quando la fine del flusso è raggiunto.

Problemi correlati