2011-12-19 30 views
6

Ho un pezzo di codice che utilizza Parallel.ForEach, probabilmente basato su una vecchia versione di estensioni Rx o sulla libreria parallela di attività. Ho installato una versione corrente delle estensioni Rx ma non riesco a trovare Parallel.ForEach. Non sto usando qualsiasi altra roba di fantasia della biblioteca e vogliono solo per elaborare alcuni dati in parallelo in questo modo:Estensioni Rx: dove si trova Parallel.ForEach?

Parallel.ForEach(records, ProcessRecord); 

ho trovato this question, ma non vorrei dipendere da una vecchia versione di Rx. Ma non ero in grado di trovare qualcosa di simile per Rx, quindi qual è il modo attuale e più diretto per farlo usando una versione Rx attuale? Il progetto utilizza .NET 3.5.

+2

Aggiornamento a .NET 4.0? – dtb

+0

possibile duplicato di [Parallel.ForEach mancante da Reactive Extensions per .Net 3.5] (http://stackoverflow.com/questions/7398962/parallel-foreach-missing-from-reactive-extensions-for-net-3-5) –

+1

@Hasan: mi sono collegato alla domanda che hai citato, quindi ovviamente ne ero consapevole. Ma la risposta sta proponendo di usare una vecchia versione Rx, che non voglio usare. – Achim

risposta

24

Non c'è bisogno di fare tutto questo stupido goosery se si dispone di Rx:

records.ToObservable() 
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) 
    .ToList() 
    .First(); 

(O, se si desidera che l'ordine degli elementi mantenuto al costo di efficienza):

records.ToObservable() 
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) 
    .Concat() 
    .ToList() 
    .First(); 

Oppure, se si desidera limitare il numero di elementi allo stesso tempo:

records.ToObservable() 
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))) 
    .Merge(5 /* at a time */) 
    .ToList() 
    .First(); 
+0

Come procedete a limitare questo solo all'elaborazione di n elementi in un dato momento? Sto provando questo con centinaia di elementi e sta provando a farli tutti in una volta, il che sta portando a enormi problemi di memoria nella mia app. Mi piacerebbe essere in grado di limitare a fare 5 alla volta dire. – Clint

+2

Aggiornato per includere la limitazione della concorrenza –

1

Ecco una semplice sostituzione:

class Parallel 
{ 
    public static void ForEach<T>(IEnumerable<T> source, Action<T> body) 
    { 
     if (source == null) 
     { 
      throw new ArgumentNullException("source"); 
     } 
     if (body == null) 
     { 
      throw new ArgumentNullException("body"); 
     } 
     var items = new List<T>(source); 
     var countdown = new CountdownEvent(items.Count); 
     WaitCallback callback = state => 
     { 
      try 
      { 
       body((T)state); 
      } 
      finally 
      { 
       countdown.Signal(); 
      } 
     }; 
     foreach (var item in items) 
     { 
      ThreadPool.QueueUserWorkItem(callback, item); 
     } 
     countdown.Wait(); 
    } 
} 
+0

Sembra promettente, ma come fa il codice a sapere quando tutti gli articoli sono stati elaborati? Parallelamente. Per ogni blocco fino a quando tutti gli oggetti sono stati elaborati, quindi non me ne devo preoccupare. – Achim

+1

Ovviamente è una semplificazione di ciò che fa Parallel.ForEach. Ho esteso la mia risposta al blocco fino a quando tutti gli elementi sono stati elaborati. – dtb

+2

Vale la pena considerare quale sia il trade-off rispetto all'aggiornamento alla 4.0. Gli autori di 'Parallel.ForEach' hanno lavorato molto per ottenere un buon equilibrio tra grado di concorrenza e quantità di spese generali. @dtb d'altra parte ha dato una buona risposta che occupa circa 30LoC. Se trovi questa risposta fa tutto ciò che ti serve, quindi giorni felici. Se ti ritrovi a modificare e tweaking e il codice cresce fino a centinaia di LoC, allora non hai più una buona versione breve, ma una brutta versione lunga in cui essenzialmente stai riscrivendo sempre più di quello che 4.0 'Parallel' già ha provato –