2014-04-16 13 views
6

Ho alcune centinaia di file che ho bisogno di caricare in Azure Blob Storage.
Desidero utilizzare la libreria di attività parallela.
Ma invece di eseguire tutti i 100 thread da caricare in un foreach nell'elenco dei file, come posso mettere un limite al numero massimo di thread che può utilizzare e finire il lavoro in parallelo. o bilancia automaticamente le cose?Numero limite di thread in Task Libreria parallela

+1

Non dovrebbe usare discussioni per questo a tutti. Esiste per questo una API basata su 'Task', che è naturalmente asincrona: [CloudBlockBUp.UploadFromFileAsync] (http://msdn.microsoft.com/en-us/library/dn451828.aspx). Sei limitato a VS2010 e non puoi usare 'async/await' (quindi hai taggato la domanda con" C# 4.0 ")? – Noseratio

+0

se ricordo correttamente userà tanti thread quanti sono i core disponibili. Non riesco a ricordare dove ho letto però. Poteva essere un blog di MS o una risposta su SO quando mi chiedevo se fosse necessario. Potresti semplicemente provarlo in un'applicazione di test con una lista di 100 ints usando Parallel. – Dbl

+1

@Noseratio non limitato a VS2010 .. posso anche usare C# 5.0 .. fammi includere come tag .. – Seenu

risposta

9

Non si dovrebbero utilizzare thread per questo. C'è una API basata su Task per questo, che è naturalmente asincrona: CloudBlockBlob.UploadFromFileAsync. Usalo con async/await e SemaphoreSlim per limitare il numero di caricamenti paralleli.

Esempio (non testata):

const MAX_PARALLEL_UPLOADS = 5; 

async Task UploadFiles() 
{ 
    var files = new List<string>(); 
    // ... add files to the list 

    // init the blob block and 
    // upload files asynchronously 
    using (var blobBlock = new CloudBlockBlob(url, credentials)) 
    using (var semaphore = new SemaphoreSlim(MAX_PARALLEL_UPLOADS)) 
    { 
     var tasks = files.Select(async(filename) => 
     { 
      await semaphore.WaitAsync(); 
      try 
      { 
       await blobBlock.UploadFromFileAsync(filename, FileMode.Create); 
      } 
      finally 
      { 
       semaphore.Release(); 
      } 
     }).ToArray(); 

     await Task.WhenAll(tasks); 
    } 
} 
2

Hai provato a utilizzare MaxDegreeOfParallelism? Come questo:

System.Threading.Tasks.Parallel.Invoke(
new Tasks.ParallelOptions {MaxDegreeOfParallelism = 5 }, actionsArray) 
0

Si può scoprire eseguendo questo:

class Program 
{ 
    static void Main(string[] args) 
    { 
     var list = new List<int>(); 

     for (int i = 0; i < 100; i++) 
     { 
      list.Add(i); 
     } 

     var runningIndex = 0; 

     Task.Factory.StartNew(() => Action(ref runningIndex)); 

     Parallel.ForEach(list, i => 
     { 
      runningIndex ++; 
      Console.WriteLine(i); 
      Thread.Sleep(3000); 
     }); 

     Console.ReadKey(); 
    } 

    private static void Action(ref int number) 
    { 
     while (true) 
     { 
      Console.WriteLine("worked through {0}", number); 
      Thread.Sleep(2900); 
     } 
    } 
} 

Come si può vedere il numero di parallelismo è più piccolo al via, diventa più grande, e diventa più piccolo verso la fine. Quindi c'è sicuramente una sorta di ottimizzazione automatica in corso.

0

In sostanza si sta andando a voler creare un'azione o compiti nuovi per ciascun file da caricare, metterli in una lista, e quindi elaborare tale elenco, la limitazione del numero che può essere elaborato in parallelo.

My blog post mostra come eseguire questa operazione sia con Attività che con Azioni e fornisce un progetto di esempio che è possibile scaricare ed eseguire per vedere entrambi in azione.

Con azioni

Se utilizzando azioni, è possibile utilizzare la funzione built-in .Net Parallel.Invoke. Qui lo limitiamo a eseguire al massimo 5 thread in parallelo.

var listOfActions = new List<Action>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 5}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

Questa opzione non fa uso della natura asincrona di UploadFromFileAsync però, così si potrebbe desiderare di utilizzare l'esempio Task qui sotto.

con compiti

Con compiti non v'è alcuna funzione built-in. Tuttavia, puoi utilizzare quello che fornisco sul mio blog.

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. 
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      await Task.WhenAll(postTaskTasks.ToArray()); 
     } 
    } 

E poi creare l'elenco delle attività e chiamare la funzione di farli correre, con dire un massimo di 5 contemporaneamente alla volta, si potrebbe fare questo:

var listOfTasks = new List<Task>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(async() => await blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); 
} 
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 5); 
Problemi correlati