2012-03-05 3 views
7

Ho cercato esempi su come utilizzare Observable.Buffer in rx ma non riesco a trovare nulla di più sostanzioso rispetto al tempo del buffer della piastra della caldaia.E 'possibile vedere Observable.Buffer su un valore diverso dal tempo

Sembra esserci un sovraccarico per specificare un "bufferClosingSelector" ma non riesco a spiegarmi.

Quello che sto cercando di fare è creare una sequenza di buffer in base al tempo o da un "accumulo". Considera uno stream di richieste in cui ogni richiesta ha un qualche tipo di peso e non voglio elaborare più di x peso accumulato alla volta, o se non è stato accumulato abbastanza basta darmi quello che è arrivato nell'ultimo periodo di tempo (Buffer regolare funzionalità)

risposta

13

bufferClosingSelector è una funzione chiamata ogni volta per ottenere un Observable che produrrà un valore quando si prevede che il buffer venga chiuso.

Per esempio,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1))) opere come il regolare Buffer(time) sovraccarico.

Se si desidera ponderare una sequenza, è possibile applicare uno Scan sulla sequenza e quindi decidere la condizione di aggregazione.

Ad esempio, source.Scan((a,c) => a + c).SkipWhile(a => a < 100) ti dà una sequenza che produce un valore quando la sequenza fonte ha aggiunto fino a più di 100.

È possibile utilizzare Amb di correre queste due condizioni di chiusura per vedere che reagisce prima:

 .Buffer(() => Observable.Amb 
        (
          Observable.Timer(TimeSpan.FromSeconds(1)), 
          source.Scan((a,c) => a + c).SkipWhile(a => a < 100) 
        ) 
       ) 

È possibile utilizzare qualsiasi serie di combinatori che produca qualsiasi valore per il buffer da chiudere in quel punto.

Nota: Il valore assegnato al selettore di chiusura non ha importanza: è la notifica che conta. Pertanto, per combinare fonti di diverso tipo con Amb, è sufficiente cambiarlo in System.Reactive.Unit.

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit()) 
+0

Solo una breve nota, amb non sembra funzionare quando la sorgente è un osservabile di tipo diverso da allora lungo – Dmitry

+0

@Dmitry stavo solo dando l'idea di base. L'ho modificato per includere un esempio di tipi diversi. – Asti

+0

E 'possibile accedere al valore di chiusura del buffer dall'osservatore? Per esempio. il buffer timestamp utilizza per chiudere. – liang

Problemi correlati