2012-06-22 10 views
8

Qualcuno sa come scrivere una funzione di estensione restituendo ParallelQuery in PLINQ?Come si scrive una funzione di estensione compatibile con il thread per PLINQ?

In particolare, ho il seguente problema: Voglio eseguire una trasformazione all'interno di una query PLINQ che richiede un motore, la cui creazione è costosa e non è possibile accedervi contemporaneamente.

ho potuto fare il seguente:

var result = source.AsParallel().Select ((i) => { var e = new Engine(); return e.Process(i); }) 

Qui, viene creato il motore una volta per ogni voce, che è troppo costoso.

Desidero creare il motore una volta per thread.

con inerti, posso venire vicino a quello che voglio con qualcosa di simile

// helper class: engine to use plus list of results obtained in thread so far 
class EngineAndResults { 
    public Engine engine = null; 
    public IEnumerable<ResultType> results; 
} 

var result = source.AsParallel().Aggregate (

    // done once per block of items (=thread), 
    // returning an empty list, but a new engine 
    () => new EngineAndList() { 
     engine = new Engine(), 
     results = Enumerable.Empty<ResultType>() 
    }, 

    // we process a new item and put it to the thread-local list, 
    // preserving the engine for further use 
    (engineAndResults, item) => new EngineAndResults() { 
     engine = engineAndResults.engine, 
     results = Enumerable.Concat (
      engineAndResults.results, 
      new ResultType [] { engineAndResults.engine.Process (item) } 
     ) 
    }, 

    // tell linq how to aggregate across threads 
    (engineAndResults1, engineAndResults2) => new EngineAndResults() { 
     engine = engineAndResults1.engine, 
     results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results) 
    }, 

    // after all aggregations, how do we come to the result? 
    engineAndResults => engineAndResults.results 
); 

Come potete vedere, io uso improprio l'accumulatore di portare un motore per thread. Il problema qui è che PLINQ alla fine aggrega i risultati in un singolo oggetto IEnumerable, che fa sì che i thread siano sincronizzati. Questo non è molto bello se voglio aggiungere un'altra estensione PLINQ in seguito.

Gradirei qualcosa come

var result = source.AsParallel() 
        .SelectWithThreadwiseInitWhichIAmLookingFor (
         () => new Engine(), 
         (engine, item) => engine.Process (item) 
      ) 

Qualcuno ha idea di come raggiungere questo obiettivo?

risposta

5

Per eseguire questa operazione è possibile utilizzare ThreadLocal<T>. Qualcosa di simile:

var engine = new ThreadLocal<Engine>(() => new Engine()); 
var result = source.AsParallel() 
        .Select(item => engine.Value.Process(item)); 
+0

Grazie. Questa è una buona soluzione. Ho fatto un breve test e sembra funzionare bene. Ho provato a trovare un modo per inserire l'inizializzazione nella funzione di estensione, ma non ci sono riuscito - ovviamente ThreadLocal deve essere creato prima che venga chiamato AsParallel. Non vedo la ragione per questo, ma comunque, questo non è un grosso problema. – JohnB

+0

Penso che non abbia funzionato, perché stavi creando un nuovo 'ThreadLocal' per ogni iterazione, quindi non ci potrebbe essere alcuna condivisione per le iterazioni che si eseguono sullo stesso thread. Tutte le iterazioni eseguite sullo stesso thread necessitano della stessa istanza di 'ThreadLocal'. – svick

Problemi correlati