2009-02-07 20 views
7

C'è un modo per eseguire l'iterazione di stile foreach su enumerabili paralleli in C#? Per gli elenchi abbonati, so che è possibile utilizzare un ciclo regolare for iterando un int nell'intervallo dell'indice, ma in realtà preferisco loa for per diversi motivi.iterazione parallela in C#?

I punti di bonus se funziona in C# 2.0

+0

wouldn' t un ciclo for è una soluzione più semplice, più breve, leggibile invece della risposta Combine di seguito? quali sono le tue ragioni per preferire foreach in questo caso – Gishu

+0

Anche l'iterazione parallela è appena emersa in Ruby 1.9 quindi avrei scommesso su di esso non in C# come ora. LISP ce l'aveva però :) – Gishu

+0

Non sono sicuro di aver capito domanda correttamente. Stai cercando di eseguire iterazioni su più enumerabili in parallelo o stai cercando di eseguire il ciclo su una enumerabile, elaborando elementi diversi in parallelo? –

risposta

9

Risposta breve, no. foreach funziona su una sola enumerazione alla volta.

Tuttavia, se si combinano gli enumerabili in parallelo in uno solo, è possibile foreach sul combinato. Non sono a conoscenza di alcun semplice, costruita nel metodo di fare questo, ma il seguente dovrebbe funzionare (anche se non ho provato):

public IEnumerable<TSource[]> Combine<TSource>(params object[] sources) 
{ 
    foreach(var o in sources) 
    { 
     // Choose your own exception 
     if(!(o is IEnumerable<TSource>)) throw new Exception(); 
    } 

    var enums = 
     sources.Select(s => ((IEnumerable<TSource>)s).GetEnumerator()) 
     .ToArray(); 

    while(enums.All(e => e.MoveNext())) 
    { 
     yield return enums.Select(e => e.Current).ToArray(); 
    } 
} 

Quindi è possibile foreach sul restituita enumerabile:

foreach(var v in Combine(en1, en2, en3)) 
{ 
    // Remembering that v is an array of the type contained in en1, 
    // en2 and en3. 
} 
+0

Perché hai scelto oggetto [] invece di IEnumerable come tipo di parametro? Eliminerebbe l'eccezione, no? – mafu

+0

C'era almeno una versione del linguaggio C# che non compilava parametri con qualcosa di diverso dall'oggetto []. Considerando questa risposta ha cinque anni, direi che l'unica versione che avevo usato a quel punto era quella che non funzionava con 'params IEnumerable [] sources'. In questi giorni, ovviamente, userei la tipizzazione più esplicita. Probabilmente userò anche il metodo di estensione 'IEnumerable .Zip' per due enumerabili - abbastanza sicuro che anche questo non esistesse cinque anni fa. – Zooba

0

Questo lavoro potrebbe funzionare?

public static class Parallel 
{ 
    public static void ForEach<T>(IEnumerable<T>[] sources, 
            Action<T> action) 
    { 
     foreach (var enumerable in sources) 
     { 
      ThreadPool.QueueUserWorkItem(source => { 
       foreach (var item in (IEnumerable<T>)source) 
        action(item); 
      }, enumerable); 
     } 
    } 
} 

// sample usage: 
static void Main() 
{ 
    string[] s1 = { "1", "2", "3" }; 
    string[] s2 = { "4", "5", "6" }; 
    IEnumerable<string>[] sources = { s1, s2 }; 
    Parallel.ForEach(sources, s => Console.WriteLine(s)); 
    Thread.Sleep(0); // allow background threads to work 
} 

Per C# 2.0, è necessario convertire le espressioni lambda di cui sopra in delegati.

Nota: questo metodo di utilità utilizza thread in background. Si consiglia di modificarlo per utilizzare i thread in primo piano e probabilmente si vorrà attendere fino al termine di tutti i thread. Se lo fai, ti suggerisco di creare i thread sources.Length - 1 e di utilizzare il thread di esecuzione corrente per l'ultima (o prima) fonte.

(Vorrei poter includere in attesa che le discussioni per finire nel mio codice, ma mi dispiace che io non so come fare quello ancora. Credo che si dovrebbe usare un WaitHandleThread.Join().)

11

BlockingCollection di .NET 4 lo rende piuttosto facile. Crea un BlockingCollection, restituisci il suo .GetConsumingEnumerable() nel metodo enumerabile. Quindi il foreach aggiunge semplicemente alla raccolta di blocchi.

E.g.

private BlockingCollection<T> m_data = new BlockingCollection<T>(); 

public IEnumerable<T> GetData(IEnumerable<IEnumerable<T>> sources) 
{ 
    Task.Factory.StartNew(() => ParallelGetData(sources)); 
    return m_data.GetConsumingEnumerable(); 
} 

private void ParallelGetData(IEnumerable<IEnumerable<T>> sources) 
{ 
    foreach(var source in sources) 
    { 
     foreach(var item in source) 
     { 
      m_data.Add(item); 
     }; 
    } 

    //Adding complete, the enumeration can stop now 
    m_data.CompleteAdding(); 
} 

Spero che questo aiuti. BTW ho posted a blog about this ieri sera

Andre

+1

Questo dovrebbe essere contrassegnato come corretto ora che .net 4.0 è fuori –

+0

Questo non risponde alla domanda, che riguarda enumerare più enumerabili insieme. –

+0

Sei assolutamente corretto Arthur, lo aggiusterò – Andre

3

ho scritto un'implementazione del EachParallel() dalla libreria Parallel .NET4. È compatibile con .NET 3.5: Parallel ForEach Loop in C# 3.5 Uso:

string[] names = { "cartman", "stan", "kenny", "kyle" }; 
names.EachParallel(name => 
{ 
    try 
    { 
     Console.WriteLine(name); 
    } 
    catch { /* handle exception */ } 
}); 

Implementazione:

/// <summary> 
/// Enumerates through each item in a list in parallel 
/// </summary> 
public static void EachParallel<T>(this IEnumerable<T> list, Action<T> action) 
{ 
    // enumerate the list so it can't change during execution 
    list = list.ToArray(); 
    var count = list.Count(); 

    if (count == 0) 
    { 
     return; 
    } 
    else if (count == 1) 
    { 
     // if there's only one element, just execute it 
     action(list.First()); 
    } 
    else 
    { 
     // Launch each method in it's own thread 
     const int MaxHandles = 64; 
     for (var offset = 0; offset < list.Count()/MaxHandles; offset++) 
     { 
      // break up the list into 64-item chunks because of a limitiation    // in WaitHandle 
      var chunk = list.Skip(offset * MaxHandles).Take(MaxHandles); 

      // Initialize the reset events to keep track of completed threads 
      var resetEvents = new ManualResetEvent[chunk.Count()]; 

      // spawn a thread for each item in the chunk 
      int i = 0; 
      foreach (var item in chunk) 
      { 
       resetEvents[i] = new ManualResetEvent(false); 
       ThreadPool.QueueUserWorkItem(new WaitCallback((object data) => 
       { 
        int methodIndex = (int)((object[])data)[0]; 

        // Execute the method and pass in the enumerated item 
        action((T)((object[])data)[1]); 

        // Tell the calling thread that we're done 
        resetEvents[methodIndex].Set(); 
       }), new object[] { i, item }); 
       i++; 
      } 

      // Wait for all threads to execute 
      WaitHandle.WaitAll(resetEvents); 
     } 
    } 
} 
1

Se si vuole attenersi ai principi fondamentali - Ho riscritto la risposta attualmente accettato in un modo più semplice:

public static IEnumerable<TSource[]> Combine<TSource> (this IEnumerable<IEnumerable<TSource>> sources) 
    { 
     var enums = sources 
      .Select (s => s.GetEnumerator()) 
      .ToArray(); 

     while (enums.All (e => e.MoveNext())) { 
      yield return enums.Select (e => e.Current).ToArray(); 
     } 
    } 

    public static IEnumerable<TSource[]> Combine<TSource> (params IEnumerable<TSource>[] sources) 
    { 
     return sources.Combine(); 
    }