2013-02-15 17 views
10

Ho un metodo asincrono predicato in questo modo:Come posso usare "Dove" con un predicato asincrono?

private async Task<bool> MeetsCriteria(Uri address) 
{ 
    //Do something involving awaiting an HTTP request. 
} 

Dire che ho una collezione di Uri s:

var addresses = new[] 
{ 
    new Uri("http://www.google.com/"), 
    new Uri("http://www.stackoverflow.com/") //etc. 
}; 

voglio filtrare addresses utilizzando MeetsCriteria. Voglio farlo in modo asincrono; Voglio che più chiamate al predicato siano eseguite in modo asincrono, e voglio quindi aspettare che tutte vengano completate e produrre il set di risultati filtrato. Purtroppo, LINQ non sembra supportare predicati asincrone, quindi qualcosa di simile non lo fa lavoro:

var filteredAddresses = addresses.Where(MeetsCriteria); 

C'è un modo simile conveniente per fare questo?

+2

Cosa ti aspetti che accadrebbe se questo fosse supportato? Soprattutto quando iterating 'filteredAddresses' che è quando viene chiamato' MeetsCriteria'. –

+0

@DanielHilgarth: Grazie; è un buon punto. Questo non sembra adattarsi a LINQ. – Sam

risposta

6

Credo che uno dei motivi per niente come questo è nel quadro è che c'è un sacco di possibili varianti e ogni scelta sarà quella giusta in determinate circostanze:

  • Qualora i predicati eseguire in parallelo, o in serie?
    • Se eseguono in parallelo, dovrebbero essere eseguiti tutti insieme o il grado di parallelismo dovrebbe essere limitato?
    • Se eseguono in parallelo, i risultati devono essere nello stesso ordine della raccolta originale, nell'ordine di completamento o in un ordine non definito?
      • Se devono essere restituiti nell'ordine di completamento, dovrebbe esserci un modo per ottenere (asincronicamente) i risultati mentre vengono completati? (Ciò richiederebbe il cambio di tipo di ritorno da Task<IEnumerable<T>> a qualcos'altro.)

Hai detto che desidera che i predicati di eseguire in parallelo. In tal caso, la scelta più semplice è quello di eseguire tutti in una volta e restituirli in ordine di esecuzione:

static async Task<IEnumerable<T>> Where<T>(
    this IEnumerable<T> source, Func<T, Task<bool>> predicate) 
{ 
    var results = new ConcurrentQueue<T>(); 
    var tasks = source.Select(
     async x => 
     { 
      if (await predicate(x)) 
       results.Enqueue(x); 
     }); 
    await Task.WhenAll(tasks); 
    return results; 
} 

È quindi possibile utilizzare in questo modo:

var filteredAddresses = await addresses.Where(MeetsCriteria); 
+1

Userei un nome di metodo diverso, quindi le diverse semantiche (in particolare il riordino) diventano chiare. – CodesInChaos

+0

@CodesInChaos Sì, forse, ma non sono sicuro di quale sarebbe un buon nome. 'AsyncParallelWhereOrderedByCompletion()' descriverebbe il metodo, ma è un nome terribile. – svick

+0

Forse un nome come "ConcurrentlyFilterAsync' sarebbe adatto. – Sam

5

Primo approccio: problema tutto chiede in anticipo uno dopo l'altro, quindi attendi fino a quando tutte le richieste non tornano, quindi filtra il risultato. (Anche il codice di svick ha fatto questo, ma qui lo sto facendo senza il ConcurrentQueue intermedio).

// First approach: massive fan-out 
var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) }); 
var addressesAndCriteria = await Task.WhenAll(tasks); 
var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A); 

Secondo approccio: eseguire le richieste una dopo l'altra. Questo richiederà più tempo ma si farà in modo di non martellare il webservice con un enorme attacco di richieste (supponendo che MeetsCriteriaAsync va a un webservice ...)

// Second approach: one by one 
var filteredAddresses = new List<Uri>(); 
foreach (var a in filteredAddresses) 
{ 
    if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a); 
} 

Terzo approccio: come per il secondo, ma usando un ipotetico C# 8 presenta "flussi asincroni". Il C# 8 non è ancora uscito, e gli stream asincroni non sono ancora stati progettati, ma possiamo sognare! Il tipo IAsyncEnumerable esiste già in RX e si spera che aggiungano altri combinatori per questo.La cosa bella di IAsyncEnumerable è che possiamo iniziare a consumare i primi filteredAddress non appena arrivano, piuttosto che aspettare che tutto venga filtrato per primo.

// Third approach: ??? 
IEnumerable<Uri> addresses = {...}; 
IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync); 

Quarto approccio: forse noi non vogliamo martello il webservice con tutte le richieste tutti in una volta, ma siamo felici di emettere più di una richiesta alla volta. Forse abbiamo fatto esperimenti e abbiamo scoperto che "tre alla volta" era un mezzo felice. NOTA: questo codice presuppone un contesto di esecuzione a thread singolo come nella programmazione dell'interfaccia utente o ASP.NET. Se viene eseguito in un contesto di esecuzione multithread, è necessario un ConcurrentQueue e ConcurrentList.

// Fourth approach: throttle to three-at-a-time requests 
var addresses = new Queue<Uri>(...); 
var filteredAddresses = new List<Uri>(); 
var worker1 = FilterAsync(addresses, filteredAddresses); 
var worker2 = FilterAsync(addresses, filteredAddresses); 
var worker3 = FilterAsync(addresses, filteredAddresses); 
await Task.WhenAll(worker1, worker2, worker3); 

async Task FilterAsync(Queue<Uri> q, List<Uri> r) 
{ 
    while (q.Count > 0) 
    { 
    var item = q.Dequeue(); 
    if (await MeetsCriteriaAsync(item)) r.Add(item); 
    } 
} 

Ci sono modi per il quarto approccio utilizzando anche la libreria di flusso di dati TPL.

Problemi correlati