2013-07-24 9 views
6

ho adottato la mia realizzazione del parallelo/consumatore in base al codice di this questionParallel.ForEach fase di stallo se integrato con BlockingCollection

class ParallelConsumer<T> : IDisposable 
{ 
    private readonly int _maxParallel; 
    private readonly Action<T> _action; 
    private readonly TaskFactory _factory = new TaskFactory(); 
    private CancellationTokenSource _tokenSource; 
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); 
    private Task _task; 

    public ParallelConsumer(int maxParallel, Action<T> action) 
    { 
     _maxParallel = maxParallel; 
     _action = action; 
    } 

    public void Start() 
    { 
     try 
     { 
      _tokenSource = new CancellationTokenSource(); 
      _task = _factory.StartNew(
       () => 
       { 
        Parallel.ForEach(
         _entries.GetConsumingEnumerable(), 
         new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, 
         (item, loopState) => 
         { 
          Log("Taking" + item); 
          if (!_tokenSource.IsCancellationRequested) 
          { 
           _action(item); 
           Log("Finished" + item); 
          } 
          else 
          { 
           Log("Not Taking" + item); 
           _entries.CompleteAdding(); 
           loopState.Stop(); 
          } 
         }); 
       }, 
       _tokenSource.Token); 
     } 
     catch (OperationCanceledException oce) 
     { 
      System.Diagnostics.Debug.WriteLine(oce); 
     } 
    } 

    private void Log(string message) 
    { 
     Console.WriteLine(message); 
    } 

    public void Stop() 
    { 
     Dispose(); 
    } 

    public void Enqueue(T entry) 
    { 
     Log("Enqueuing" + entry); 
     _entries.Add(entry); 
    } 

    public void Dispose() 
    { 
     if (_task == null) 
     { 
      return; 
     } 

     _tokenSource.Cancel(); 
     while (!_task.IsCanceled) 
     { 
     } 

     _task.Dispose(); 
     _tokenSource.Dispose(); 
     _task = null; 
    } 
} 

Ed ecco un codice di prova

class Program 
{ 
    static void Main(string[] args) 
    { 
     TestRepeatedEnqueue(100, 1); 
    } 

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount) 
    { 
     bool[] flags = new bool[itemCount]; 
     var consumer = new ParallelConsumer<int>(parallelCount, 
               (i) => 
               { 
                flags[i] = true; 
               } 
      ); 
     consumer.Start(); 
     for (int i = 0; i < itemCount; i++) 
     { 
      consumer.Enqueue(i); 
     } 
     Thread.Sleep(1000); 
     Debug.Assert(flags.All(b => b == true)); 



    } 
} 

il test fallisce sempre - si è sempre bloccato intorno al 93 ° oggetto dei 100 testati. Qualche idea su quale parte del mio codice abbia causato questo problema e su come risolverlo?

risposta

8

Non è possibile utilizzare Parallel.Foreach() con BlockingCollection.GetConsumingEnumerable(), come si è scoperto.

Per una spiegazione, vedere questo post del blog:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

Questo blog fornisce anche il codice sorgente di un metodo chiamato GetConsumingPartitioner() che è possibile utilizzare per risolvere il problema.

Estratto dal blog: implementazione GetConsumingEnumerable

di BlockingCollection sta usando la sincronizzazione interna di BlockingCollection che supporta già più consumatori in concomitanza, ma PerOgni non sa che, e la sua logica enumerabile-partizionamento ha anche bisogno di prendere una serratura mentre accedendo alla enumerabile.

In questo modo, c'è più sincronizzazione qui di quanto sia effettivamente necessario, determinando un impatto sulle prestazioni potenzialmente non trascurabile.

[Inoltre] l'algoritmo di partizionamento utilizzato in modo predefinito da Parallel.ForEach e PLINQ utilizza il chunking per ridurre al minimo i costi di sincronizzazione (un pezzo), quindi rilasciare il lucchetto.

Mentre questo design può aiutare con il throughput complessivo, per gli scenari che si concentrano maggiormente sulla bassa latenza, tale chunking può essere proibitivo.

+0

Grazie. Questo ha risolto il mio problema. Ad ogni modo, quando provo ulteriormente, il codice nel mio OP non fallisce quando il numero di elementi è un membro di questa sequenza, [A200672] (http://oeis.org/A200672) ad es. 1, 2, 3, 5, 7, 9, 13, 17, 21, 29, 37, 45, 61, 77, 93, ... Qualche idea del perché? solo curioso. – user69715

+0

@ user69715 Questo è il tipo di comportamento strano che ho trovato quando ho provato a fare una cosa simile. Immagino che sia solo una strana interazione tra Parallel.ForEach() e il sottostante BlockingCollection, ma non posso davvero spiegarlo. –

2

La ragione di fallimento è causa del seguente motivo come spiegato here

L'algoritmo di partizionamento impiegato da default sia Parallel.ForEach e l'uso PLINQ chunking per minimizzare i costi di sincronizzazione : piuttosto di prendere il lucchetto una volta per elemento, prenderà il lucchetto, afferrerà un gruppo di elementi (un pezzo), e quindi rilascia il blocco.

Per farlo funzionare, è possibile aggiungere un metodo su vostra classe ParallelConsumer<T> per indicare che l'aggiunta è stata completata, come di seguito

public void StopAdding() 
    { 
     _entries.CompleteAdding(); 
    } 

E ora chiamare questo metodo dopo la vostra for loop, come di seguito

Altrimenti, Parallel.ForEach() aspetterebbe che venga raggiunta la soglia in modo da afferrare il blocco e avviare l'elaborazione.

+0

la cosa è in produzione, le attività vengono accodate in continuazione, quindi contrassegnare "StopAdding" non aiuta. Grazie per la risposta, +1, ma vado con l'altra risposta. – user69715

+0

Oops, sembra che non riesco ancora a fare +1 – user69715