2016-05-11 12 views
10

Sto lavorando a un'applicazione che chiama un servizio esterno e deve aggiungere tutte le voci della raccolta esterna in una raccolta locale. Il problema attualmente è che la raccolta esterna può superare i 1000 record, ma i risultati di ricerca restituiti possono includere solo fino a venti voci.Utilizzare più attività per recuperare tutti i record da una grande raccolta

Per motivi di velocità ho pensato che utilizzando una raccolta di attività sarebbe la via da seguire, così mi si avvicinò con il codice qui sotto:

int totalCount = returnedCol.total_count; 
     while (totalCount > myDict.Count) 
     { 
      int numberOfTasks = // logic to calculate how many tasks to run 

      List<Task> taskList = new List<Task>(); 

      for (int i = 1; i <= numberOfTasks; i++) 
      { 
       Interlocked.Add(ref pageNumber, pageSize); 

       Task<SearchResponse> testTask = Task.Run(() => 
       { 
        return ExternalCall.GetData(pageNumber, pageSize); 
       }); 

       Thread.Sleep(100); 

       taskList.Add(testTask); 
       testTask.ContinueWith(o => 
       { 
        foreach (ExternalDataRecord dataiwant in testTask.Result.dataiwant) 
        { 
         if (!myDict.ContainsKey(dataiwant.id)) 
          myDict.GetOrAdd(dataiwant.id, dataiwant); 
        } 
       }); 
      } 
      Task.WaitAll(taskList.ToArray()); 
     } 

Tuttavia, questo non produce tutti i risultati. La variabile pageNumber si incrementa correttamente ogni volta, ma sembra che non tutti i risultati dell'attività vengano analizzati (poiché la stessa logica su un singolo thread su un set di dati più piccolo restituisce tutti i risultati previsti). Inoltre, ho provato a dichiarare le singole attività in una catena (piuttosto che in un ciclo) e tutti i dati di test sono stati restituiti. Sembra che più alto è il valore che passo in Thread.Sleep() più i risultati vengono aggiunti alla raccolta locale (ma questo non è l'ideale, poiché significa che il processo richiede più tempo!)

Attualmente in un campione di 600 record I Sono solo circa 150-200 aggiunti alla collezione myDict. Mi manca qualcosa di ovvio?

+0

Sembra che "la logica per calcolare quanti compiti da eseguire" potrebbero essere rilevanti qui. Inoltre, quanto sei positivo che '(pageNumber, pageSize)' sia corretto? Potrei vedere il problema che si sta affrontando provenire dal fermare il ciclo prima di arrivare effettivamente alla fine della raccolta o dalla richiesta di blocchi di dati che sono in gran parte sovrapposti. – StriplingWarrior

+0

'pageSize' non cambia mai nel mio codice in quanto è sempre impostato sul valore più alto disponibile (cioè 20). Ho testato per vedere se il valore di 'pageNumber' sta aumentando correttamente ogni volta, e inizia a comportarsi da solo (aumentando di venti ogni volta), ma inizia a diventare irregolare. Aumentare la discussione.Il periodo di sospensione ha un effetto su questo, ma dal momento che lo sto incrementando in un modo sicuro per i thread non riesco a capire perché ciò accada –

+0

Inoltre, per motivi di test ho creato una fonte di dati esterna con una raccolta di 60 elementi e ho provato a recuperare un elemento alla volta tramite 60 attività, e non ho ancora ottenuto tutti i dati nella mia raccolta (a meno che non aumenti il ​​periodo di sospensione) –

risposta

0

Ti manca il fatto che lo ContinueWith() restituisca un altro compito e non lo stai aggiungendo come tuo taskList.

Un approccio migliore sarebbe utilizzare async/await disponibile da .NET 4.5. Fornisce un approccio meno pesante alla soluzione.

cambiereste l'algoritmo di essere più simile a questo:

public async Task Process() 
{ 
    int totalCount = returnedCol.total_count; 

    while (totalCount > myDict.Count) 
    { 
     int numberOfTasks = // logic to calculate how many tasks to run 

     List<Task> taskList = new List<Task>(); 

     for (int i = 1; i <= numberOfTasks; i++) 
     { 
      Interlocked.Add(ref pageNumber, pageSize); 

      taskList.Add(ProcessPage(pageNumber, pageSize)); 
     } 

     await Task.WhenAll(taskList.ToArray()); 
    } 
} 

private async Task ProcessPage(int pageNumber, int pageSize) 
{ 
     SearchResponse result = await Task.Run(() => 
      ExternalCall.GetData(pageNumber, pageSize)).ConfigureAwait(false); 

     foreach (ExternalDataRecord dataiwant in result.dataiwant) 
     { 
      myDict.GetOrAdd(dataiwant.id, dataiwant); 
     } 
} 

Il async parola chiave dice al compilatore che ci sarà un await in seguito. await gestisce essenzialmente i dettagli relativi alla chiamata ContinueWith. Se si desidera realmente che lo ExternalCall si verifichi in un'altra attività, sarà semplicemente await i risultati di tale chiamata.

+0

Grazie, ora ho tutti gli articoli nella mia collezione. Ora è il momento per il vero stress test :) –

+2

'ProcessPage' non ha alcun' attendi' all'interno di esso quindi non sarà asincrono. Il suo codice non verrà eseguito in parallelo perché 'ProcessPage' non tornerà mai prima del termine del lavoro. È necessario 'SearchResponse result = attendere Task.Run (() => ExternalCall.GetData (pageNumber, pageSize)). ConfigureAwait (false);' o anche meglio 'SearchResponse result = Attendi ExternalCall.GetDataAsync (pageNumber, pageSize) .ConfigureAwait (falso); 'se è disponibile. Essendo una chiamata esterna al webservice dovresti essere in grado di renderla facilmente asincrona senza usare 'Task.Run ('. –

+0

Grazie mille @ScottChamberlain, il test ha richiesto più tempo di quanto previsto, quindi tengo a mente questo –

2

Penso che se si prenda un approccio più funzionale e meno imperativo al codice, sarà molto meno probabile imbattersi in problemi difficili da comprendere. Penso che qualcosa di simile avrebbe avuto lo stesso effetto si sta andando per:

int totalCount = returnedCol.total_count; 
var tasks = Enumerable.Range(1, totalCount/pageSize) 
    .Select(async page => { 
     await Task.Delay(page * 100); 
     return ExternalCall.GetData(page, pageSize)); 
    }) 
    .ToArray(); 
myDict = (await Task.WhenAll(tasks)) 
    .ToDictionary(dataiwant => dataiwant.id); 

Il codice di cui sopra presuppone che si vuole ancora attendere 100ms tra le richieste per scopi di limitazione. Se hai appena avuto che Thread.Sleep() lì per cercare la risoluzione di problemi si stavano avendo, si potrebbe semplificare ulteriormente:

int totalCount = returnedCol.total_count; 
var tasks = Enumerable.Range(1, totalCount/pageSize) 
    .Select(async page => await Task.Run(() => ExternalCall.GetData(page, pageSize))) 
    .ToArray(); 
myDict = (await Task.WhenAll(tasks)) 
    .ToDictionary(dataiwant => dataiwant.id); 
+0

L'unica cosa che mi dà fastidio è la necessità implicita di 'Task.Delay'. In realtà non dovrebbe essere necessario se il codice è corretto –

+0

@BerinLoritsch : Sono d'accordo, l'ho lasciato perché so che alcuni servizi negheranno le richieste se vengono colpiti troppo velocemente dalla stessa fonte, quindi è possibile che fosse lì per il gusto della limitazione, ma ho aggiunto un altro esempio senza di esso. – StriplingWarrior

+0

Grazie per la risposta, Berin è riuscito a rispondere per primo, così ho iniziato con il loro codice, ma se dovessi incorrere in ulteriori problemi, darò un'occhiata anche a questo :) –

Problemi correlati