14

Mentre utilizzavo Parallel.ForEach nel mio programma, ho trovato che alcuni thread non sembravano mai finire. In effetti, continuava a generare nuovi thread più e più volte, un comportamento che non mi aspettavo e sicuramente non volevo.Parallel.ForEach continua a generare nuovi thread

sono stato in grado di riprodurre questo comportamento con il seguente codice, che, proprio come il mio programma 'reale', entrambi gli usi processore e la memoria molto (NET 4.0 codice):

public class Node 
{ 
    public Node Previous { get; private set; } 

    public Node(Node previous) 
    { 
     Previous = previous; 
    } 
} 

public class Program 
{ 
    public static void Main(string[] args) 
    { 
     DateTime startMoment = DateTime.Now; 
     int concurrentThreads = 0; 

     var jobs = Enumerable.Range(0, 2000); 
     Parallel.ForEach(jobs, delegate(int jobNr) 
     { 
      Interlocked.Increment(ref concurrentThreads); 

      int heavyness = jobNr % 9; 

      //Give the processor and the garbage collector something to do... 
      List<Node> nodes = new List<Node>(); 
      Node current = null; 
      for (int y = 0; y < 1024 * 1024 * heavyness; y++) 
      { 
       current = new Node(current); 
       nodes.Add(current); 
      } 

      TimeSpan elapsed = DateTime.Now - startMoment; 
      int threadsRemaining = Interlocked.Decrement(ref concurrentThreads); 
      Console.WriteLine("[{0:mm\\:ss}] Job {1,4} complete. {2} threads remaining.", elapsed, jobNr, threadsRemaining); 
     }); 
    } 
} 

Quando viene eseguito sul mio quad-core, inizia inizialmente con 4 thread simultanei, proprio come ci si aspetterebbe. Tuttavia, nel tempo vengono creati sempre più thread. Alla fine, questo programma quindi genera un OutOfMemoryException:

[00:00] Job 0 complete. 3 threads remaining. 
[00:01] Job 1 complete. 4 threads remaining. 
[00:01] Job 2 complete. 4 threads remaining. 
[00:02] Job 3 complete. 4 threads remaining. 
[00:05] Job 9 complete. 5 threads remaining. 
[00:05] Job 4 complete. 5 threads remaining. 
[00:05] Job 5 complete. 5 threads remaining. 
[00:05] Job 10 complete. 5 threads remaining. 
[00:08] Job 11 complete. 5 threads remaining. 
[00:08] Job 6 complete. 5 threads remaining. 
... 
[00:55] Job 67 complete. 7 threads remaining. 
[00:56] Job 81 complete. 8 threads remaining. 
... 
[01:54] Job 107 complete. 11 threads remaining. 
[02:00] Job 121 complete. 12 threads remaining. 
.. 
[02:55] Job 115 complete. 19 threads remaining. 
[03:02] Job 166 complete. 21 threads remaining. 
... 
[03:41] Job 113 complete. 28 threads remaining. 
<OutOfMemoryException> 

Il grafico utilizzo della memoria per l'esperimento di cui sopra è il seguente:

Processor and memory usage

(L'immagine è in Olandese, la parte superiore rappresenta processore utilizzo, l'utilizzo della memoria della parte inferiore.) Come si può vedere, sembra che un nuovo thread venga generato quasi ogni volta che il garbage collector si mette in mezzo (come si può vedere negli intervalli di utilizzo della memoria).

Qualcuno può spiegare perché questo sta accadendo e cosa posso fare al riguardo? Voglio solo .NET per fermare la deposizione delle uova nuove discussioni, e finire i thread esistenti prima ...

+0

ho girato una domanda di follow-up [ ] (http://stackoverflow.com/questions/15381174/how-to-count-the-amount-of-concurrent-threads-in-net-application) Se per i thread direttamente, c sono per lo più in aumento (molto raramente e in modo insignificante decrescente) – Fulproof

risposta

16

È possibile limitare il numero massimo di thread che vengono creati specificando un'istanza ParallelOptions con il set MaxDegreeOfParallelism proprietà:

var jobs = Enumerable.Range(0, 2000); 
ParallelOptions po = new ParallelOptions 
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount 
}; 

Parallel.ForEach(jobs, po, jobNr => 
{ 
    // ... 
}); 

per quanto riguarda il motivo per cui che stai ricevendo il comportamento che stai osservando: la TPL (che è alla base PLINQ) è, per impostazione predefinita, la libertà di indovinare il numero ottimale di thread da utilizzare. Ogni volta che un task parallelo si blocca, l'utilità di pianificazione può creare un nuovo thread per mantenere il progresso. Nel tuo caso, il blocco potrebbe avvenire in modo implicito; ad esempio, tramite la chiamata Console.WriteLine o (come osservato) durante la procedura di garbage collection.

Da Concurrency Levels Tuning with Task Parallel Library (How Many Threads to Use?):

Poiché la politica TPL predefinito è di usare un thread per processore, possiamo concludere che TPL assume inizialmente che il carico di lavoro di un compito è ~ 100% e 0% di attesa, e se l'ipotesi iniziale fallisce e l'attività entra in uno stato di attesa (cioè inizia a bloccare) - TPL con la libertà di aggiungere thread come appropriato.

+1

Chiarimento: 'Parallel.ForEach()' è * non * parte di PLINQ. – svick

+0

@svick: hai ragione; è TPL. – Douglas

+0

Grazie per la risposta. Ha senso che quando un thread blocca TPL crea un nuovo thread per mantenere i progressi in corso, non pensavo che il garbage collector che puliva potesse contare come un thread bloccato. Inoltre, grazie per il link che hai postato, chiarisce perché potresti volere più thread rispetto ai core, non ci avevo mai pensato. – Astrotrain

6

Probabilmente dovresti leggere un po 'come funziona l'utilità di pianificazione.

http://msdn.microsoft.com/en-us/library/ff963549.aspx (seconda metà della pagina)

"Il pool di thread NET gestisce automaticamente il numero di lavoratore thread nel pool. Si aggiunge e rimuove fili secondo incorporato euristica. Il .Il pool di thread NET ha due meccanismi principali per l'iniezione di thread : un meccanismo di eliminazione di fame che aggiunge thread di lavoro se non vede alcun progresso sugli elementi in coda e su una scalata scalata che tenta di massimizzare il throughput mentre utilizza come pochi thread il più possibile

L'obiettivo di evitare la fame è prevenire lo stallo. Questo tipo di deadlock può verificarsi quando un thread di lavoro attende un evento di sincronizzazione che può essere soddisfatto solo da un elemento di lavoro che è ancora in sospeso nelle code globali o locali del pool di thread. Se ci fosse un numero fisso di thread di lavoro e tutti quei thread fossero allo stesso modo bloccati, il sistema non sarebbe in grado di fare ulteriori progressi. L'aggiunta di un nuovo thread di lavoro risolve il problema.

Un obiettivo dell'euristica di scalata è quello di migliorare l'utilizzo dei core quando i thread sono bloccati dall'I/O o da altre condizioni di attesa che arrestano il processore. Per impostazione predefinita, il pool di thread gestito ha un thread di lavoro per core. Se uno di questi thread di lavoro diventa bloccato, è possibile che un core sia sottoutilizzato, , a seconda del carico di lavoro complessivo del computer. La logica dell'iniezione di thread non distingue tra un thread che è bloccato e un thread che sta eseguendo un'operazione lunga e con utilizzo intensivo del processore. Pertanto, ogni volta che le code globali o locali del pool di thread contengono in attesa di lavoro elementi, elementi di lavoro attivi che richiedono molto tempo per l'esecuzione (più di un mezzo secondo) può innescare la creazione di nuovo thread di lavoro piscina discussioni."

è possibile contrassegnare un'attività come LongRunning ma questo ha l'effetto collaterale di allocare un filo per esso da fuori pool di thread che significa che il compito non può essere inline.

ricordare che il ParallelFor ossequi il lavoro è dato come blocchi, quindi anche se il lavoro in un ciclo è abbastanza piccolo, il lavoro complessivo svolto dall'attività richiamata dall'aspetto potrebbe apparire più lungo per lo scheduler.

La maggior parte delle chiamate al GC di e di loro stessi non sono bloccanti (viene eseguito su un thread separato) ma se si attende il completamento di GC, ciò viene bloccato. Ricorda anche che il GC sta riorganizzando la memoria in modo che questo possa avere alcuni effetti collaterali (e il blocco) se stai cercando di allocare memoria durante l'esecuzione di GC. Non ho specifiche qui ma so che la PPL ha alcune caratteristiche di allocazione della memoria specificamente per la gestione della memoria concorrente per questo motivo.

Guardando l'output del codice sembra che le cose siano in esecuzione per molti secondi. Quindi non sono sorpreso che tu stia vedendo un'iniezione di filo. Tuttavia, mi sembra di ricordare che la dimensione del pool di thread predefinita è di circa 30 thread (probabilmente in base al numero di core sul sistema). Un thread occupa circa un MB di memoria prima che il codice lo allochi di più quindi non sono chiaro perché potresti ottenere un'eccezione di memoria insufficiente qui.

+1

Grazie per le informazioni di base. Come puoi vedere ogni task alloca tra 0 e 8 milioni di nodi e occupa quindi una buona quantità di memoria. Man mano che vengono avviate più attività, la memoria diventa scarsa e il GC deve liberare memoria più frequentemente per adattarsi alle nuove attività. Sospetto che lo scheduler non possa sapere se un thread è bloccato perché è in attesa di memoria o di qualcos'altro, e avvia un nuovo thread in entrambi i casi. Ovviamente, se il thread bloccato era in attesa di memoria, questo peggiora solo l'effetto ... – Astrotrain

+0

Sto vedendo lo stesso effetto, e sono un po 'sorpreso che questo non sia gestito meglio dal TPL. Sto caricando blocchi di dati dal web da 3M + posizioni diverse e faccio qualche elaborazione su ogni blocco dopo che è stato caricato in memoria. Mentre le attività sono in attesa che i dati vengano caricati dal Web, il TPL vede il blocco del carico Web e pianifica nuove attività per caricare ancora più dati. Dopo un po ', ci sono così tante attività che caricano blocchi di dati che alla fine finisco per ottenere le eccezioni OOM. L'importazione di 'Parallel.ForEach' per usare un valore fisso' MaxDegreeOfParallelism' ha risolto il problema. –

1

ho postato la domanda di follow-up "How to count the amount of concurrent threads in .NET application?"

Se a contare direttamente i fili, il loro numero in Parallel.For() per lo più ((molto raramente e insignificante diminuzione) aumenta e non è releleased dopo ciclo completamento.

controllato questo sia in modalità di rilascio e di debug, con

ParallelOptions po = new ParallelOptions 
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount 
}; 

e senza

Le cifre variano, ma le conclusioni sono le stesse.

ecco il codice pronto stavo usando, se qualcuno vuole giocare con: "Come per contare il numero di thread concorrenti in applicazione .NET"

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Edit4Posting 
{ 
public class Node 
{ 

    public Node Previous { get; private set; } 
    public Node(Node previous) 
    { 
    Previous = previous; 
    } 
    } 
    public class Edit4Posting 
    { 

    public static void Main(string[] args) 
    { 
     int concurrentThreads = 0; 
     int directThreadsCount = 0; 
     int diagThreadCount = 0; 

     var jobs = Enumerable.Range(0, 160); 
     ParallelOptions po = new ParallelOptions 
     { 
     MaxDegreeOfParallelism = Environment.ProcessorCount 
     }; 
     Parallel.ForEach(jobs, po, delegate(int jobNr) 
     //Parallel.ForEach(jobs, delegate(int jobNr) 
     { 
     int threadsRemaining = Interlocked.Increment(ref concurrentThreads); 

     int heavyness = jobNr % 9; 

     //Give the processor and the garbage collector something to do... 
     List<Node> nodes = new List<Node>(); 
     Node current = null; 
     //for (int y = 0; y < 1024 * 1024 * heavyness; y++) 
     for (int y = 0; y < 1024 * 24 * heavyness; y++) 
     { 
      current = new Node(current); 
      nodes.Add(current); 
     } 
     //******************************* 
     directThreadsCount = Process.GetCurrentProcess().Threads.Count; 
     //******************************* 
     threadsRemaining = Interlocked.Decrement(ref concurrentThreads); 
     Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}", 
      jobNr, threadsRemaining, directThreadsCount); 
     }); 
     Console.WriteLine("FINISHED"); 
     Console.ReadLine(); 
    } 
    } 
} 
Problemi correlati