9

Ho un'applicazione in cui ho 1000+ piccole parti di 1 file di grandi dimensioni.Esecuzione numero N di thread in parallelo e in modo sequenziale

Devo caricare massimo 16 parti alla volta.

Ho utilizzato la libreria parallela di thread di .Net.

Ho usato Parallel.For dividere in più parti e assegnato 1 metodo che dovrebbe essere eseguito per ogni parte e impostare DegreeOfParallelism a 16.

devo eseguire 1 metodo con valori di checksum generati da parte diversa caricamenti, quindi devo impostare determinati meccanismi in cui devo aspettare che tutte le parti caricate dicano 1000 per completare. Nella libreria TPL sto affrontando 1 problema è che sta eseguendo casualmente uno qualsiasi dei 16 thread da 1000.

Voglio un meccanismo con cui posso eseguire inizialmente i primi 16 thread, se il 1 ° o 2 ° o uno dei 16 thread completa il suo compito la prossima parte 17 dovrebbe essere avviata.

Come posso ottenere questo?

enter image description here

+4

mi piace l'immagine – Abdullah

+0

Se @ di usr risposta non funziona, dare un'occhiata al [la mia risposta qui] (http://stackoverflow.com/a/15056827/106159) che potrebbero essere applicati. Altrimenti se puoi usare le classi DataflowBlock di TPL che potrebbero essere migliori (sto pensando che non puoi perché hai specificato C# 4) –

+0

@Abdullah Manca solo qualche cerchio rosso disegnato a mano – TheLethalCoder

risposta

2

Ecco il modo manuale per farlo.

È necessaria una coda. La coda è una sequenza di attività in sospeso. Devi dequeue e metterli nella lista delle attività lavorative. Quando l'attività è terminata, rimuovila dall'elenco delle attività lavorative e prendine un'altra dalla coda. Il thread principale controlla questo processo. Ecco l'esempio di come farlo.

Per il test ho utilizzato l'elenco di numeri interi ma dovrebbe funzionare per altri tipi poiché utilizza generici.

private static void Main() 
{ 
    Random r = new Random(); 
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList(); 

    ParallelQueue(items, DoWork); 
} 

private static void ParallelQueue<T>(List<T> items, Action<T> action) 
{ 
    Queue pending = new Queue(items); 
    List<Task> working = new List<Task>(); 

    while (pending.Count + working.Count != 0) 
    { 
     if (pending.Count != 0 && working.Count < 16) // Maximum tasks 
     { 
      var item = pending.Dequeue(); // get item from queue 
      working.Add(Task.Run(() => action((T)item))); // run task 
     } 
     else 
     { 
      Task.WaitAny(working.ToArray()); 
      working.RemoveAll(x => x.IsCompleted); // remove finished tasks 
     } 
    } 
} 

private static void DoWork(int i) // do your work here. 
{ 
    // this is just an example 
    Task.Delay(i).Wait(); 
    Console.WriteLine(i); 
} 

Per favore fatemi sapere se incontrate problemi su come implementare DoWork per conto vostro. perché se si modifica la firma del metodo potrebbe essere necessario apportare alcune modifiche.

Aggiornamento

Si può anche fare questo con asincrono attendono senza bloccare il thread principale.

private static void Main() 
{ 
    Random r = new Random(); 
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList(); 

    Task t = ParallelQueue(items, DoWork); 

    // able to do other things. 

    t.Wait(); 
} 

private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func) 
{ 
    Queue pending = new Queue(items); 
    List<Task> working = new List<Task>(); 

    while (pending.Count + working.Count != 0) 
    { 
     if (working.Count < 16 && pending.Count != 0) 
     { 
      var item = pending.Dequeue(); 
      working.Add(Task.Run(async() => await func((T)item))); 
     } 
     else 
     { 
      await Task.WhenAny(working); 
      working.RemoveAll(x => x.IsCompleted); 
     } 
    } 
} 

private static async Task DoWork(int i) 
{ 
    await Task.Delay(i); 
} 
+0

Perché stai usando 'new Task' e non' Task.Run'? Anche il blocco con 'Wait' nel codice server è in genere una cattiva idea. – avo

+0

Ho appena dato la via. 'Task.Delay.Wait' era solo un esempio. puoi fare tutto ciò che vuoi all'interno di 'DoWork'. Apprezzo che tu mi abbia aiutato. ho risolto il codice è meglio ora (?) .... non ho esperienza in multi-threading.anche sono nuovo nella programmazione (solo 1 anno) quindi portami con me;) @avo –

+0

@avo guarda l'aggiornamento. è buono? –

4
var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/; 
SingleItemPartitioner.Create(workitems) 
.AsParallel() 
.AsOrdered() 
.WithDegreeOfParallelism(16) 
.WithMergeOptions(ParallelMergeOptions.NotBuffered) 
.ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); }); 

questo dovrebbe essere tutto ciò che serve. Ho dimenticato come i metodi sono chiamati esattamente ... Guarda la documentazione.

Verificare questo stampando sulla console dopo aver dormito per 1 sec (come questo codice di esempio).

+0

la mia risposta è stata accettata. ma penso che non dovrebbe essere. Penso che l'OP avesse quello che è descritto qui http://stackoverflow.com/questions/33869830/ordered-parallel-is-not-working-as-expected-convert-list-into-ienumerable/33869969#33869969 .'workitems. Selezionare (x => x) 'è necessario per correggere questo parallelo per Elenco di elementi. Il tuo approccio dividerà la lista in blocchi e questo non era ciò che l'OP voleva. quindi cambiare lista in ienumerable risolverà il suo problema. –

+0

@ M.kazemAkhgary che in realtà è un buon punto, ho dimenticato il partizionamento. Questo è davvero un cattivo TPL di default, un altro. – usr

5

Un possibile candidato per questo può essere TPL Dataflow. Questa è una dimostrazione che acquisisce un flusso di numeri interi e li stampa sulla console. È possibile impostare i MaxDegreeOfParallelism a seconda di quale molti fili che si desidera far girare in parallelo:

void Main() 
{ 
    var actionBlock = new ActionBlock<int>(
      i => Console.WriteLine(i), 
      new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16}); 

    foreach (var i in Enumerable.Range(0, 200)) 
    { 
     actionBlock.Post(i); 
    } 
} 

Questo può anche scalare bene se si vuole avere più produttori/consumatori.

1

Un'altra opzione sarebbe quella di utilizzare un BlockingCollection<T> come una coda tra il filo lettore di file e le tue 16 thread uploader.Ogni thread di uploader circolerà semplicemente attorno al consumo della raccolta di blocchi finché non sarà completo.

E, se si desidera limitare il consumo di memoria nella coda, è possibile impostare un limite superiore sulla raccolta di blocco in modo tale che il thread del lettore di file si fermi quando il buffer ha raggiunto la capacità. Ciò è particolarmente utile in un ambiente server in cui potrebbe essere necessario limitare la memoria utilizzata per chiamata utente/API.

// Create a buffer of 4 chunks between the file reader and the senders 
BlockingCollection<Chunk> queue = new BlockingCollection<Chunk>(4); 

// Create a cancellation token source so you can stop this gracefully 
CancellationTokenSource cts = ... 

thread di lettura File

... 
queue.Add(chunk, cts.Token); 
... 
queue.CompleteAdding(); 

Invio discussioni

for(int i = 0; i < 16; i++) 
{ 
    Task.Run(() => { 
     foreach (var chunk in queue.GetConsumingEnumerable(cts.Token)) 
     { 
      .. do the upload 
     } 
    }); 
} 
Problemi correlati