2011-11-27 25 views
6

Come posso eseguire la seguente ripetizione osservabile finché stream.DataAvailable non è valido? Attualmente sembra che non si fermi mai.Ripeti rapidi TakeWhile provoca un ciclo infinito

AsyncReadChunk e Observable.Return all'interno della sezione Defer effettuare la chiamata OnNext quindi la chiamata OnCompleted. Quando Repeat riceve la chiamata OnNext, passa a TakeWhile. Quando TakeWhile non è soddisfatto, completa l'osservabile ma penso che l'OnCompleted che arriva subito dopo l'OnNext sia così veloce da far ripetere a Repeat l'osservabile e causare il loop infinito.

Come posso correggere questo comportamento?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
{ 
    return Observable.Defer(() => 
     { 
      try 
      { 
       return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); 
      } 
      catch (Exception) 
      { 
       return Observable.Return(new byte[0]); 
      } 
     }) 
     .Repeat() 
     .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
} 
+4

Complimenti per capire come risolvere il problema e grazie per aver condiviso la soluzione. Tuttavia, potresti inserire la soluzione alla tua domanda come risposta anziché modificare la tua domanda? –

+0

Samet, ho spostato la tua auto-risposta fuori dalla domanda e in una risposta separata, contrassegnata come wiki della comunità. –

risposta

2

RISPOSTA DI AUTO: (seguito è una risposta inviato da Samet, l'autore della domanda, tuttavia, ha inviato la risposta, come parte della domanda che mi sto muovendo in un separato.. risposta, la marcatura come wiki comunità, in quanto l'autore non si è mosso da solo.)


ho scoperto da refactoring che si tratta di un problema con scheduler. Il reso utilizza lo schedulatore immediato mentre Ripeti utilizza CurrentThread. Il codice fisso è sotto.

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
    { 
     return Observable.Defer(() => 
            { 
             try 
             { 
              return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
             catch (Exception) 
             { 
              return Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
            }) 
      .Repeat() 
      .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
    } 
Problemi correlati