2012-11-27 21 views
7

Ho un programma in cui sto ricevendo eventi e voglio elaborarli in batch, in modo che tutti gli elementi che entrano mentre sto elaborando il batch corrente appariranno nel batch successivo .Estensioni reattive: buffer fino a quando il sottoscrittore è inattivo

I semplici metodi Buffer TimeSpan e conteggio in Rx mi daranno più lotti di elementi invece di darmi un grosso lotto di tutto ciò che è entrato (nei casi in cui l'abbonato impiega più tempo del TimeSpan specificato o più di N gli articoli arrivano e N è maggiore del conteggio).

Ho cercato di utilizzare i più complessi sovraccarichi di buffer che richiedono Func<IObservable<TBufferClosing>> o IObservable<TBufferOpening> and Func<TBufferOpening, IObservable<TBufferClosing>>, ma non riesco a trovare esempi su come utilizzarli, tanto meno capire come applicarli a ciò che sto cercando di fare.

+0

[Questa pagina] (http://leecampbell.blogspot.com.au/2011/03/rx-part-9join-window-buffer-and-group.html) potrebbe aiutare con sovraccarichi del buffer. L'intera serie è molto utile –

+0

Hai provato 'BufferBlock' in TPL Dataflow? – Asti

risposta

1

Fa questo ciò che vuoi?

var xs = new Subject<int>(); 
var ys = new Subject<Unit>(); 

var zss = 
    xs.Buffer(ys); 

zss 
    .ObserveOn(Scheduler.Default) 
    .Subscribe(zs => 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine(String.Join("-", zs)); 
     ys.OnNext(Unit.Default); 
    }); 

ys.OnNext(Unit.Default); 
xs.OnNext(1); 
Thread.Sleep(200); 
xs.OnNext(2); 
Thread.Sleep(600); 
xs.OnNext(3); 
Thread.Sleep(400); 
xs.OnNext(4); 
Thread.Sleep(300); 
xs.OnNext(5); 
Thread.Sleep(900); 
xs.OnNext(6); 
Thread.Sleep(100); 
xs.OnNext(7); 
Thread.Sleep(1000); 

Il mio risultato:

1-2-3 
4-5 
6-7 
+0

La rimozione di ObserveOn per eseguire il programma su un thread provoca un'interruzione. –

+0

@ChrisEldredge - Sì, lo fa. Potrebbe essere necessario consentire il multi-threading per farlo funzionare. – Enigmativity

+0

Ciò comporterebbe anche un'attesa quando la coda è vuota (collega una CPU al 100%). Se rimuovi il sonno quando zs.Count == 0, vedrai il picco. –

1

Quello che vi serve è qualcosa per tamponare i valori e poi, quando il lavoratore è pronto chiede per il buffer corrente e quindi lo ripristina. Questo può essere fatto con una combinazione di RX e Task

class TicTac<Stuff> { 

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>(); 

    List<Stuff> in = new List<Stuff>(); 

    public void push(Stuff stuff){ 
     lock(this){ 
      if(in == null){ 
       in = new List<Stuff>(); 
       Items.SetResult(in); 
      } 
      in.Add(stuff); 
     } 
    } 

    private void reset(){ 
     lock(this){ 
      Items = new TaskCompletionSource<List<Stuff>>(); 
      in = null; 
     } 
    } 

    public async Task<List<Stuff>> Items(){ 
     List<Stuff> list = await Items.Task; 
     reset(); 
     return list; 
    } 
} 

poi

var tictac = new TicTac<double>(); 

IObservable<double> source = .... 

source.Subscribe(x=>tictac.Push(x)); 

Poi, nel tuo lavoratore

while(true){ 

    var items = await tictac.Items(); 

    Thread.Sleep(100); 

    for each (item in items){ 
     Console.WriteLine(item); 
    } 

} 
+1

Suppongo che funzioni, ma perché usare Reactive Extensions se devo codificare direttamente con TPL/APM? –

+0

ReactiveExtensions è un framework.Puoi estenderlo come desideri.Io spesso aggiungo i miei operatori per le cose personalizzate – bradgonesurfing

+2

@ChrisEldredge Per essere onesti, quello che stai chiedendo non è adatto per il framework Rx, non c'è modo per l'abbonato di segnalare all'osservatore che è inattivo o no, devi farlo fuori banda. – casperOne

1

Il modo in cui ho fatto prima è quello di tirare la Osserva il metodo in DotPeek/Reflector e prendi quel concetto di accodamento che ha e adattalo alle nostre esigenze. Ad esempio, nelle applicazioni di interfaccia utente con dati di spunta veloci (come la finanza) il thread dell'interfaccia utente può essere inondato di eventi e talvolta non può essere aggiornato abbastanza velocemente. In questi casi vogliamo eliminare tutti gli eventi tranne l'ultimo (per uno strumento particolare). In questo caso abbiamo modificato la coda interna di ObserveOn in un singolo valore di T (cercare ObserveLatestOn (IScheduler)). Nel tuo caso vuoi la Coda, tuttavia vuoi spingere l'intera coda non solo il primo valore. Questo dovrebbe farti cominciare.

Problemi correlati