2014-10-28 16 views
13

Ho codice in questo modo:Come eseguire istruzioni "let" LINQ in parallelo?

var list = new List<int> {1, 2, 3, 4, 5}; 

var result = from x in list.AsParallel() 
      let a = LongRunningCalc1(x) 
      let b = LongRunningCalc2(x) 
      select new {a, b}; 

Diciamo che i metodi LongRunningCalc ogni prendere 1 secondo. Il codice sopra richiede circa 2 secondi per essere eseguito, perché mentre l'elenco di 5 elementi viene gestito in parallelo, i due metodi chiamati dalle istruzioni let sono chiamati in modo sequenziale.

Tuttavia, questi metodi possono essere tranquillamente chiamati anche in parallelo. Ovviamente hanno bisogno di fondersi per il select ma fino ad allora dovrebbe essere eseguito in parallelo - il select dovrebbe aspettare per loro.

C'è un modo per raggiungere questo obiettivo?

risposta

7

Non sarà in grado di utilizzare la sintassi di query o l'operazione let, ma è possibile scrivere un metodo per eseguire più operazioni per ogni elemento in parallelo:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TFinal>(
    this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<TResult1, TResult2, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     return resultAggregator(result1.Result, result2.Result); 
    }); 
} 

Questo permetterebbe di scrivere:

var query = list.AsParallel() 
    .SelectAll(LongRunningCalc1, 
     LongRunningCalc2, 
     (a, b) => new {a, b}) 

È possibile aggiungere sovraccarichi per le operazioni parallele supplementari pure:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TResult3, TFinal> 
    (this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<T, TResult3> selector3, 
    Func<TResult1, TResult2, TResult3, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     var result3 = Task.Run(() => selector3(item)); 
     return resultAggregator(
      result1.Result, 
      result2.Result, 
      result3.Result); 
    }); 
} 

E 'possibile scrivere una versione di gestire un numero di selettori non noti al momento della compilazione, ma per fare che tutti hanno bisogno di calcolare un valore dello stesso tipo:

public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    IEnumerable<Func<T, TResult>> selectors) 
{ 
    return query.Select(item => selectors.AsParallel() 
      .Select(selector => selector(item)) 
      .AsEnumerable()); 
} 
public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    params Func<T, TResult>[] selectors) 
{ 
    return SelectAll(query, selectors); 
} 
+0

Questa non è la mia area di esperti, ma non è necessario attendere i risultati delle attività? – Magnus

+0

@Magnus Sono già d'accordo. – Servy

+0

Yeh che chiama '.Result' attende l'esecuzione del Task. Ho semplificato un po 'il mio codice di esempio, ma questo mi ha messo sulla strada giusta, grazie per l'aiuto. – Lyall

0

vorrei farlo usando Microsoft reattiva Framework ("Rx-Main" in NuGet).

Eccolo:

var result = 
    from x in list.ToObservable() 
    from a in Observable.Start(() => LongRunningCalc1(x)) 
    from b in Observable.Start(() => LongRunningCalc2(x)) 
    select new {a, b}; 

La cosa bella è che è possibile accedere i risultati come sono prodotte con il metodo .Subscribe(...):

result.Subscribe(x => /* Do something with x.a and/or x.b */); 

Super semplice!

Problemi correlati