Sto cercando di avvolgere la mia attenzione sul supporto di Reactive Extensions per la concorrenza e sto avendo difficoltà a ottenere i risultati che sto cercando. Quindi non posso ottenerlo ancora.Estensioni reattive: Concorrenza all'interno dell'abbonato
Ho una fonte che emette i dati nel flusso più velocemente di quanto l'abbonato possa consumare. Preferirei configurare lo stream in modo tale che venga utilizzato un altro thread per richiamare il sottoscrittore per ogni nuovo elemento dallo stream, in modo che l'abbonato abbia più thread che lo attraversano contemporaneamente. Sono in grado di garantire la sicurezza del thread dell'abbonato.
Nell'esempio riportato di seguito viene illustrato il problema:
uscitaObservable.Interval(TimeSpan.FromSeconds(1))
.Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x))
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(x =>
{
Console.WriteLine("{0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(5000); // Simulate long work time
});
La console si presenta così (date rimossi):
4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6
Si prega di notare la "valore osservato" delta di tempo. Il sottoscrittore non viene richiamato in parallelo anche se l'origine continua a emettere dati più rapidamente di quanto il sottoscrittore può elaborarlo. Mentre posso immaginare una serie di scenari in cui il comportamento attuale sarebbe utile, ho bisogno di essere in grado di elaborare i messaggi non appena sono disponibili.
Ho provato diverse varianti di Pianificatori con il metodo ObserveOn, ma nessuno di loro sembra fare ciò che voglio.
Oltre a far scorrere un thread all'interno dell'azione Sottoscrivi per eseguire il lavoro a esecuzione prolungata, c'è qualcosa che mi manca che consentirà la consegna simultanea di dati al sottoscrittore?
Grazie in anticipo per tutte le risposte e suggerimenti!
È passato un po 'di tempo dall'ultimo utilizzo di RX, ma non sarebbe stato meglio evitare la gestione manuale dei thread? Cioè, utilizzare TPL per generare un'attività in background nel metodo Subscribe, che è quindi in grado di restituire immediatamente. Per quello che posso dire, questo risolverebbe i tuoi problemi di concorrenza ed eviterebbe il rischio di generare molti thread (che altrimenti accadrebbe se la tua fonte fosse più veloce dei tuoi iscritti). –