2016-04-12 18 views
10

Ho una raccolta di 1000 messaggi di input da elaborare. Sto collegando la raccolta di input e avviando la nuova attività per ciascun messaggio per essere elaborata.Come limitare il numero massimo di attività parallele in C#

//Assume this messages collection contains 1000 items 
var messages = new List<string>(); 

foreach (var msg in messages) 
{ 
    Task.Factory.StartNew(() => 
    { 
    Process(msg); 
    }); 
} 

Possiamo indovinare quanti messaggi massimo contemporaneamente ottenere elaborati al momento (supponendo normale microprocessore Quad), oppure possiamo limitare il numero massimo di messaggi da elaborare al momento?

Come garantire che questo messaggio venga elaborato nella stessa sequenza/ordine della raccolta?

+0

Come dividere i messaggi in batch ed eseguire ogni batch in parallelo? – bit

risposta

7

SemaphoreSlim è una soluzione molto buona in questo caso e mi raccomando altamente OP di provare questo , ma la risposta di @ Manoj ha un difetto, come menzionato nei commenti. Si dovrebbe attendere l'emittente prima di generare l'attività in questo modo.

Risposta Aggiornato: Come @Vasyl sottolineato semaforo può essere smaltito prima del completamento dei compiti e solleverà un'eccezione quando il metodo Release() viene chiamato in modo prima di uscire dal blocco utilizzando deve attendere il completamento di tutte le attività create.

int maxConcurrency=10; 
var messages = new List<string>(); 
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) 
{ 
    List<Task> tasks = new List<Task>(); 
    foreach(var msg in messages) 
    { 
     concurrencySemaphore.Wait(); 

     var t = Task.Factory.StartNew(() => 
     { 

      try 
      { 
       Process(msg); 
      } 
      finally 
      { 
       concurrencySemaphore.Release(); 
      } 
     }); 

     tasks.Add(t); 
    } 

    Task.WaitAll(tasks.ToArray()); 
} 
+0

Cosa succederà se il metodo 'Process' funzionerà a lungo? 'concurrencySemaphore.Release()' può essere chiamato quando 'concurrencySemaphore' è già disponibile. E come risultato - 'ObjectDisposedException'. –

+0

@VasylZvarydchuk hai ragione. Ho aggiornato la risposta – ClearLogic

1

Si può semplicemente impostare il grado di concorrenza max come questo senso:

int maxConcurrency=10; 
var messages = new List<1000>(); 
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) 
{ 
    foreach(var msg in messages) 
    { 
     Task.Factory.StartNew(() => 
     { 
      concurrencySemaphore.Wait(); 
      try 
      { 
       Process(msg); 
      } 
      finally 
      { 
       concurrencySemaphore.Release(); 
      } 
     }); 
    } 
} 
+1

Questo blocca inutilmente i thread, se il pool di thread ha più thread della concorrenza massima. – yaakov

11

Si potrebbe utilizzare Parallel.Foreach e contare su MaxDegreeOfParallelism invece.

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10}, 
msg => 
{ 
    // logic 
    Process(msg); 
}); 
+1

Questo è esattamente il tipo di elaborazione per cui è stato creato 'Parallel.ForEach'. –

+0

E dal momento che la libreria Task parallela è compilata su 'ThreadPool', possiamo presumere che eseguirà tutte le attività del sistema solo se non lo specifichiamo esplicitamente. – Toxantron

+0

Ciò garantirebbe che i messaggi vengano elaborati nello stesso ordine in cui si verificano nell'Elenco? – bit

1

pensare sarebbe meglio all'utente Parallel LINQ

Parallel.ForEach(messages , 
    new ParallelOptions{MaxDegreeOfParallelism = 4}, 
      x => Process(x); 
     ); 

dove x è il grado massimo di parallelismo

0

Se è necessario l'accodamento in ordine (l'elaborazione può essere completata in qualsiasi ordine), non è necessario un semaforo. Vecchio stile se le affermazioni funzionano bene:

 const int maxConcurrency = 5; 
     List<Task> tasks = new List<Task>(); 
     foreach (var arg in args) 
     { 
      var t = Task.Run(() => { Process(arg); }); 

      tasks.Add(t); 

      if(tasks.Count >= maxConcurrency) 
       Task.WaitAny(tasks.ToArray()); 
     } 

     Task.WaitAll(tasks.ToArray()); 
Problemi correlati