2013-05-20 11 views
9

Sto cercando di avvolgere la mia attenzione sul supporto di Reactive Extensions per la concorrenza e sto avendo difficoltà a ottenere i risultati che sto cercando. Quindi non posso ottenerlo ancora.Estensioni reattive: Concorrenza all'interno dell'abbonato

Ho una fonte che emette i dati nel flusso più velocemente di quanto l'abbonato possa consumare. Preferirei configurare lo stream in modo tale che venga utilizzato un altro thread per richiamare il sottoscrittore per ogni nuovo elemento dallo stream, in modo che l'abbonato abbia più thread che lo attraversano contemporaneamente. Sono in grado di garantire la sicurezza del thread dell'abbonato.

Nell'esempio riportato di seguito viene illustrato il problema:

uscita
Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}", 
           DateTime.Now, 
           Thread.CurrentThread.ManagedThreadId, x)) 
    .ObserveOn(NewThreadScheduler.Default) 
    .Subscribe(x => 
       { 
        Console.WriteLine("{0} Thread: {1} Observed value: {2}", 
            DateTime.Now, 
            Thread.CurrentThread.ManagedThreadId, x); 
        Thread.Sleep(5000); // Simulate long work time 
       }); 

La console si presenta così (date rimossi):

4:25:20 PM Thread: 6 Source value: 0 
4:25:20 PM Thread: 11 Observed value: 0 
4:25:21 PM Thread: 12 Source value: 1 
4:25:22 PM Thread: 12 Source value: 2 
4:25:23 PM Thread: 6 Source value: 3 
4:25:24 PM Thread: 6 Source value: 4 
4:25:25 PM Thread: 11 Observed value: 1 
4:25:25 PM Thread: 12 Source value: 5 
4:25:26 PM Thread: 6 Source value: 6 

Si prega di notare la "valore osservato" delta di tempo. Il sottoscrittore non viene richiamato in parallelo anche se l'origine continua a emettere dati più rapidamente di quanto il sottoscrittore può elaborarlo. Mentre posso immaginare una serie di scenari in cui il comportamento attuale sarebbe utile, ho bisogno di essere in grado di elaborare i messaggi non appena sono disponibili.

Ho provato diverse varianti di Pianificatori con il metodo ObserveOn, ma nessuno di loro sembra fare ciò che voglio.

Oltre a far scorrere un thread all'interno dell'azione Sottoscrivi per eseguire il lavoro a esecuzione prolungata, c'è qualcosa che mi manca che consentirà la consegna simultanea di dati al sottoscrittore?

Grazie in anticipo per tutte le risposte e suggerimenti!

+0

È passato un po 'di tempo dall'ultimo utilizzo di RX, ma non sarebbe stato meglio evitare la gestione manuale dei thread? Cioè, utilizzare TPL per generare un'attività in background nel metodo Subscribe, che è quindi in grado di restituire immediatamente. Per quello che posso dire, questo risolverebbe i tuoi problemi di concorrenza ed eviterebbe il rischio di generare molti thread (che altrimenti accadrebbe se la tua fonte fosse più veloce dei tuoi iscritti). –

risposta

11

Il problema fondamentale qui è che si desidera che Rx sia in grado di inviare eventi in un modo che infrange realmente le regole di funzionamento degli osservabili. Penso che sarebbe istruttivo osservare le linee guida di progettazione Rx qui: http://go.microsoft.com/fwlink/?LinkID=205219 - in particolare, "4.2 Assumere che le istanze dell'osservatore siano chiamate in modo serializzato". Ad esempio, non sei in grado di eseguire chiamate OnNext in parallelo. In effetti, il comportamento degli ordini di Rx è piuttosto centrale nella sua filosofia di design.

Se si guarda alla fonte, vedrai che Rx inforces questo comportamento nella classe ScheduledObserver<T> da cui ObserveOnObserver<T> deriva ... OnNexts vengono spediti da una coda interna e ciascuno deve completare prima di quello successivo viene inviato - all'interno del contesto di esecuzione indicato. Rx non consentirà l'esecuzione contemporanea delle chiamate OnNext di un singolo sottoscrittore.

Ciò non vuol dire che non sia possibile avere più abbonati in esecuzione a tariffe diverse. In realtà questo è facile capire se si modifica il codice come segue:

var source = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}", 
           DateTime.Now, 
           Thread.CurrentThread.ManagedThreadId, x)) 
    .ObserveOn(NewThreadScheduler.Default); 

var subscription1 = source.Subscribe(x => 
    { 
     Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}", 
          DateTime.Now, 
          Thread.CurrentThread.ManagedThreadId, x); 
     Thread.Sleep(1000); // Simulate long work time 
    }); 

var subscription2 = source.Subscribe(x => 
{ 
    Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}", 
         DateTime.Now, 
         Thread.CurrentThread.ManagedThreadId, x); 
    Thread.Sleep(5000); // Simulate long work time 
}); 

Ora vedrete Subscriber 1 ottenendo davanti abbonato 2.

Quello che non si può facilmente fare è chiedere a un osservabile fare qualcosa come l'invio di una chiamata OnNext ad un abbonato "pronto" - che è una specie di cosa stai chiedendo in modo indiretto. Presumo anche che tu non voglia veramente creare un nuovo thread per ogni OnNext in una situazione di consumo lento!

In questo scenario sembra che si stia meglio con un solo utente che non fa altro che spingere il lavoro su una coda il più velocemente possibile, il quale è a sua volta servito da un numero di thread di lavoro che è possibile controllare come necessario per tenere il passo.

+0

Grazie James. La tua spiegazione ha perfettamente senso. Grazie per il puntatore alle linee guida di progettazione ... copre un terreno che non ho visto nei libri e nei blog. – foomonkey42

+1

Benvenuto! Per inciso, ho scritto su un approccio efficiente per abbandonare gli eventi al fine di consentire a un consumatore di rimanere aggiornato qui: http://www.zerobugbuild.com/?p=192 Potrebbe essere rilevante per alcuni scenari. –

+2

Puoi anche fare cose come 'source.Select (x => Observable.Defer (() => HandleAsync (x))). Merge (5) .Subscribe (...)' dove 'HandleAsync (x)' è un metodo che gestisce quell'elemento e restituisce un'attività per segnalare quando viene eseguito. Questo modello è buono quando la tua fonte scoppia a volte più velocemente di quanto il tuo sottoscrittore può elaborare. Quanto sopra osserverà contemporaneamente fino a 5 risultati. Cordiali saluti All'interno della chiamata Iscriviti, stai effettivamente osservando i risultati delle attività. – Brandon

Problemi correlati