2013-07-21 9 views
7

Attualmente sto cercando di avvolgere la mia mente sulla concorrenza con RX .NET e di confondermi con qualcosa. Voglio eseguire quattro attività relativamente lente in parallelo, quindi ho pensato che fosse la NewThreadScheduler.Default la strada da percorrere, dato che è "Rappresenta un oggetto che pianifica ogni unità di lavoro su un thread separato.".NewThreadScheduler.Programmi predefiniti tutti funzionano sullo stesso thread

Ecco il mio codice di installazione:

static void Test() 
    { 
     Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId); 

     var query = Enumerable.Range(1, 4); 
     var obsQuery = query.ToObservable(NewThreadScheduler.Default); 
     obsQuery.Subscribe(DoWork, Done); 

     Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId); 
    } 

    static void DoWork(int i) 
    { 
     Thread.Sleep(500); 
     Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId); 
    } 

    static void Done() 
    { 
     Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId); 
    } 

ho assunto la "X Discussione Y" sarebbe uscita un thread diverso id ogni volta, ma la produzione effettiva è:

Starting. Thread 1 
Last line. Thread 1 
1 Thread 3 
2 Thread 3 
3 Thread 3 
4 Thread 3 
Done. Thread 3 

Tutto il lavoro è essere uno sullo stesso nuovo thread in ordine sequenziale, che non è quello che mi aspettavo.

Presumo che mi manchi qualcosa, ma non riesco a capire cosa.

risposta

8

Esistono due parti per una query osservabile, lo e lo Subscription. (Questa è anche la differenza tra gli operatori ObserveOn e SubscribeOn.)

tuo Query è

Enumerable 
    .Range(1, 4) 
    .ToObservable(NewThreadScheduler.Default); 

Questo crea un osservabile che produce valori sul default NewThreadScheduler per quel sistema.

la vostra sottoscrizione è

obsQuery.Subscribe(DoWork, Done); 

Questo viene eseguito DoWork per ogni valore prodotto dal Query e Done quando i Query finiture con un OnComplete chiamata. Non penso ci siano garanzie su quale thread verranno chiamate le funzioni nel metodo subscribe, in pratica se tutti i valori della query sono prodotti sullo stesso thread che è il thread su cui verrà eseguita la sottoscrizione. Sembra che lo stiano facendo in modo che tutte le chiamate di sottoscrizione siano fatte sullo stesso thread, cosa che molto probabilmente è fatta per eliminare molti errori multi-threading comuni.

in modo da avere due problemi, uno è con la vostra registrazione, se si cambia Query a

Enumerable 
    .Range(1, 4) 
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId); 
    .ToObservable(NewThreadScheduler.Default); 

Vedrete ogni valore prodotto in un nuovo thread.

L'altro problema è una delle intenzioni e del design di Rx. È destinato che il Query è il processo di lunga durata e il Subscription è un metodo breve che si occupa dei risultati. Se si desidera eseguire una funzione di lunga durata come Rx Osservabile, l'opzione migliore è utilizzare Observable.ToAsync.

+0

Grazie per questo. Non solo stai chiarendo cosa sto facendo male, ma penso anche che quello che dici riguardo l'intenzione di Rx ha molto senso. –

+0

La risposta sopra sembra essere superata, il metodo Do non è più disponibile su Enumerable. – VivekDev

+0

Perché il metodo Do funzioni, è necessario installare System.Interactive pacchetto nuget – VivekDev

Problemi correlati