2015-11-25 13 views
17

Ho un'enumerazione di elementi (RunData.Demand), ognuno dei quali rappresenta un'operazione che comporta la chiamata di un'API su HTTP. Funziona benissimo se ho solo foreach attraverso tutto e chiama l'API durante ogni iterazione. Tuttavia, ogni iterazione richiede un secondo o due, quindi mi piacerebbe eseguire 2-3 thread e dividere il lavoro tra di loro. Ecco quello che sto facendo:Come accodare correttamente le attività da eseguire in C#

ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads 
var tasks = RunData.Demand 
    .Select(service => Task.Run(async delegate 
    { 
     var availabilityResponse = await client.QueryAvailability(service); 
     // Do some other stuff, not really important 
    })); 

await Task.WhenAll(tasks); 

La chiamata client.QueryAvailability chiama fondamentalmente un'API utilizzando la classe HttpClient:

public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request) 
{ 
    var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request); 

    if (response.IsSuccessStatusCode) 
    { 
     return await response.Content.ReadAsAsync<QueryAvailabilityResponse>(); 
    } 

    throw new HttpException((int) response.StatusCode, response.ReasonPhrase); 
} 

Questa grande opera per un po ', ma alla fine le cose cominciano timeout. Se imposto il timeout di HttpClient a un'ora, inizierò a ricevere strani errori del server interno.

Quello che ho iniziato a fare era impostare un cronometro nel metodo QueryAvailability per vedere cosa stava succedendo.

Quello che sta accadendo è che tutti i 1200 elementi in RunData.Demand vengono creati contemporaneamente e tutti i 1200 metodi await client.PostAsJsonAsync vengono chiamati. Sembra quindi che usi i 2 thread per ricontrollare lentamente le attività, quindi verso la fine ho compiti che sono stati in attesa di 9 o 10 minuti.

Ecco il comportamento vorrei:

Mi piacerebbe creare i 1.200 compiti, poi eseguirli 3-4 alla volta come le discussioni diventano disponibili. Io faccio non voglio fare la coda di 1.200 chiamate HTTP immediatamente.

C'è un buon modo per fare questo?

+0

Non sembra che tu crei un nuovo 'client' per ogni chiamata. Sai che 'System.Net.Http.HttpClient' non è thread-safe per le chiamate di istanza? Dovrebbe essere creata una nuova istanza per (e smaltita dopo) ogni chiamata. – Enigmativity

+0

Il metodo 'QueryAvailability' è in una classe che crea' HttpClient', che è un membro privato di quell'istanza. Non sapevo che non fosse thread-safe, potevo sicuramente crearlo prima di ogni chiamata. Lo esaminerò di più, grazie! –

+0

Hmm, ho fatto un po 'di ricerche e sembra che quello che sto facendo sia sicuro. Vedi [qui] (http://stackoverflow.com/questions/11178220/is-httpclient-safe-to-use-concurrently) e [here] (http://www.tomdupont.net/2014/11/net- 45-httpclient-is-thread-safe.html) –

risposta

17

Come consiglio sempre .. quello che ti serve è TPL Dataflow (per installare: Install-Package Microsoft.Tpl.Dataflow).

Si crea un ActionBlock con un'azione da eseguire su ciascun elemento. Impostare MaxDegreeOfParallelism per la limitazione. Iniziare a postare in esso e attendere il suo completamento:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{ 
    var availabilityResponse = await client.QueryAvailability(service); 
    // ... 
}, 
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); 

foreach (var service in RunData.Demand) 
{ 
    block.Post(service); 
} 

block.Complete(); 
await block.Completion; 
+0

Anche questo sembra promettente, e sembra che funzioni bene con i metodi 'async' (ad esempio, posso' attendere 'al completamento del blocco) .. Darò questo a tiro. –

+0

@MikeChristensen è. È una delle poche librerie in .Net che sono state scritte appositamente con async-attendono in mente. – i3arnon

+2

Funziona perfettamente! Sono ora un fan. Accettare. –

3

Stai utilizzando le chiamate asincrone HTTP, in modo da limitare il numero di thread non aiuterà (né sarà ParallelOptions.MaxDegreeOfParallelism in Parallel.ForEach come una delle risposte suggerisce). Anche un singolo thread può avviare tutte le richieste ed elaborare i risultati non appena arrivano.

Un modo per risolverlo è utilizzare TPL Dataflow.

Un'altra bella soluzione è quella di dividere la fonte IEnumerable in partizioni e gli elementi di processo in ogni partizione in sequenza come descritto in this blog post:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll(
     from partition in Partitioner.Create(source).GetPartitions(dop) 
     select Task.Run(async delegate 
     { 
      using (partition) 
       while (partition.MoveNext()) 
        await body(partition.Current); 
     })); 
} 
3

Mentre la biblioteca Dataflow è grande, penso che sia un po 'pesante quando non si usa blocco della composizione. Tenderei ad usare qualcosa come il metodo di estensione qui sotto.

Inoltre, a differenza del metodo di partizionamento, questo esegue i metodi asincroni sul contesto di chiamata - l'avvertenza è che se il codice non è veramente asincrono, o prende un 'percorso veloce', allora sarà effettivamente dirette in modo sincrono in quanto non le discussioni sono creati in modo esplicito.

public static async Task RunParallelAsync<T>(this IEnumerable<T> items, Func<T, Task> asyncAction, int maxParallel) 
{ 
    var tasks = new List<Task>(); 

    foreach (var item in items) 
    { 
     tasks.Add(asyncAction(item)); 

     if (tasks.Count < maxParallel) 
       continue; 

     var notCompleted = tasks.Where(t => !t.IsCompleted).ToList(); 

     if (notCompleted.Count >= maxParallel) 
      await Task.WhenAny(notCompleted); 
    } 

    await Task.WhenAll(tasks); 
} 
3

vecchia questione, ma vorrei proporre una soluzione alternativa leggera utilizzando la classe SemaphoreSlim. Basta fare riferimento a System.Threading.

SemaphoreSlim sem = new SemaphoreSlim(4,4); 

foreach (var service in RunData.Demand) 
{ 

    await sem.WaitAsync(); 
    Task t = Task.Run(async() => 
    { 
     var availabilityResponse = await client.QueryAvailability(serviceCopy));  
     // do your other stuff here with the result of QueryAvailability 
    } 
    t.ContinueWith(sem.Release()); 
} 

Il semaforo funge da meccanismo di blocco. Puoi inserire il semaforo solo chiamando Wait (WaitAsync) che sottrae uno dal conteggio. Il rilascio della chiamata aggiunge uno al conteggio.

+1

Se ho capito bene, a seconda di come lo si usa, potrebbe funzionare bene con 'sem.Wait()' piuttosto che attendere 'sem.WaitASync() '. Fare così bloccherebbe il thread chiamante, quindi non dovrebbe essere fatto sul thread dell'interfaccia utente, ma su qualsiasi altro thread, potrebbe essere il modo più semplice per gestire il lavoro da fare. In particolare, se uno dei 4 sem è disponibile, procederà immediatamente. In caso contrario, attende fino a quando uno è disponibile. – ToolmakerSteve

Problemi correlati