2012-05-07 32 views
10

Problema: Vorrei scaricare 100 file in parallelo da AWS S3 utilizzando il proprio SDK .NET. Il contenuto scaricato deve essere memorizzato in 100 flussi di memoria (i file sono abbastanza piccoli e posso prenderli da lì). Mi sto confondendo tra Task, IAsyncResult, Parallel. * E altri approcci diversi in .NET 4.0.Download di file batch paralleli da Amazon S3 tramite AWS S3 SDK per .NET

Se provo a risolvere il problema io stesso, al largo della parte superiore della mia testa mi immagino qualcosa di simile a questo pseudocodice: (a cura di aggiungere tipi di alcune variabili)

using Amazon; 
using Amazon.S3; 
using Amazon.S3.Model; 

AmazonS3 _s3 = ...; 
IEnumerable<GetObjectRequest> requestObjects = ...; 


// Prepare to launch requests 
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null); 

// Launch requests 
var asyncRequestsLaunched = asyncRequests.ToList(); 

// Prepare to finish requests 
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq); 

// Finish requests 
var actualResponses = responses.ToList(); 

// Fetch data 
var data = actualResponses.Select(rp => { 
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms; 
}); 

Questo codice lancia 100 richieste in parallelo, che è buono. Tuttavia, ci sono due problemi:

  1. L'ultima istruzione scaricherà i file in serie, non in parallelo. Non sembra essere il metodo BeginCopyTo()/EndCopyTo() sullo stream ...
  2. L'istruzione precedente non verrà rilasciata fino a quando tutte le richieste non avranno risposto. In altre parole, nessuno dei file inizierà il download finché tutti non inizieranno.

Così qui comincio a pensare che sto dirigendo sulla strada sbagliata ...

Aiuto?

risposta

19

È probabilmente più semplice se si interrompe l'operazione in un metodo che gestirà la richiesta una in modo asincrono e quindi la chiamerà 100 volte.

Per iniziare, identifichiamo il risultato finale desiderato. Poiché ciò con cui lavorerai è un MemoryStream significa che dovrai restituire uno Task<MemoryStream> dal tuo metodo. La firma sarà simile a questo:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request) 

Perché il vostro oggetto AmazonS3 implementa il Asynchronous Design Pattern, è possibile utilizzare il FromAsync method sul TaskFactory class per generare un Task<T> da una classe che implementa il modello asincrono design, in questo modo:

Quindi sei già in un buon posto, hai un Task<T> che puoi aspettare o ricevere una richiamata quando la chiamata termina. Tuttavia, è necessario in qualche modo tradurre il GetObjectResponse restituito dalla chiamata a Task<GetObjectResponse> in un MemoryStream.

A tal fine, si desidera utilizzare ContinueWith method nella classe Task<T>. Consideralo come la versione asincrona di nello , ma è una proiezione asincrona su Task<T>, ad eccezione del fatto che ogni volta che chiami ContinueWith, stai creando una nuova attività che esegue nella sezione di codice.

Con questo, il metodo appare come il seguente:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request) 
{ 
    // Start the task of downloading. 
    Task<GetObjectResponse> response = 
     Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
      s3.BeginGetObject, s3.EndGetObject, request, null 
     ); 

    // Translate. 
    Task<MemoryStream> translation = response.ContinueWith(t => { 
     using (Task<GetObjectResponse> resp = t){ 
      var ms = new MemoryStream(); 
      t.Result.ResponseStream.CopyTo(ms); 
      return ms; 
     } 
    }); 

    // Return the full task chain. 
    return translation; 
} 

Si noti che in quanto sopra si può forse chiamare il overload of ContinueWith passando TaskContinuationOptions.ExecuteSynchronously, come appare si sta facendo un lavoro minimo (non posso dire, le risposte potrebbero essere enorme). Nei casi in cui si sta eseguendo un lavoro molto minimale in cui sarebbe dannoso iniziare una nuova attività per completare il lavoro, è necessario passare TaskContinuationOptions.ExecuteSynchronously in modo da non perdere tempo a creare nuove attività per operazioni minime.

Ora che avete il metodo che può tradursi uno richiesta in un Task<MemoryStream>, la creazione di un wrapper che elaborerà qualsiasi numero di loro è semplice:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests) 
{ 
    // Just call Select on the requests, passing our translation into 
    // a Task<MemoryStream>. 
    // Also, materialize here, so that the tasks are "hot" when 
    // returned. 
    return requests.Select(r => GetMemoryStreamAsync(s3, r)). 
     ToArray(); 
} 

In quanto sopra, è sufficiente prendere una sequenza delle istanze GetObjectRequest e restituirà un array di Task<MemoryStream>. Il fatto che restituisca una sequenza materializzata è importante. Se non lo si materializza prima di tornare, le attività non verranno create finché la sequenza non viene iterata.

Naturalmente, se si desidera questo comportamento, quindi con tutti i mezzi, è sufficiente rimuovere la chiamata a .ToArray(), restituire il metodo IEnumerable<Task<MemoryStream>> e quindi le richieste verranno effettuate durante l'iterazione attraverso le attività.

Da lì, è possibile elaborare uno alla volta (utilizzando il Task.WaitAny method in un ciclo) o attendere per tutti loro per essere completato (chiamando il Task.WaitAll method). Un esempio di quest'ultimo sarebbe:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests) 
{ 
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests); 
    Task.WaitAll(tasks); 
    return tasks.Select(t => t.Result).ToList(); 
} 

Inoltre, va detto che questa è una abbastanza buona misura per la Reactive Extensions framework, in quanto questo molto ben adattato verso un IObservable<T> attuazione.

+2

Questa è un'ottima soluzione, che è incredibilmente ben descritta e consegnata in circa 20 minuti dopo aver postato la domanda. Sono deliziato. Ha funzionato bene anche per me, dopo aver apportato le correzioni per aggiungere nomi di classe S3 più precisi e specificare un metodo FromAsync() più specifico. Casper, vuoi che io modifichi le modifiche nella tua risposta? – DenNukem

+1

@DenNukem Oh, non ho affrontato la copia in modo asincrono da uno stream all'altro. Questo sarà disponibile in .NET 4.5, ma richiederebbe una certa "asincrona"/"attesa" per far sì che non assomigli a un disastro ferroviario. Per ora, usa il [metodo 'Stream.CopyTo'] (http://msdn.microsoft.com/en-us/library/system.io.stream.copyto.aspx) ma sappi che in .NET 4.5 puoi usare ['Stream.CopyToAsync'] (http://msdn.microsoft.com/en-us/library/system.io.stream.copytoasync.aspx) insieme a' async'/'await' per fare * tutto * di questo più elegante. – casperOne

+0

@casperOne Posso vedere un esempio di .NET 4.5 come farlo? – user1265146

Problemi correlati