2014-12-21 13 views
5

Sto costruendo un'applicazione Play Framework in Scala in cui vorrei eseguire lo streaming di una matrice di byte su S3. Sto usando la libreria Play-S3 per fare questo. Il "upload di file Multipart" della sezione documentazione è ciò che è rilevante qui:Utilizzo di Iteratees e Enumerators in Play Scala per lo streaming dei dati su S3

// Retrieve an upload ticket 
val result:Future[BucketFileUploadTicket] = 
    bucket initiateMultipartUpload BucketFile(fileName, mimeType) 

// Upload the parts and save the tickets 
val result:Future[BucketFilePartUploadTicket] = 
    bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content)) 

// Complete the upload using both the upload ticket and the part upload tickets 
val result:Future[Unit] = 
    bucket completeMultipartUpload (uploadTicket, partUploadTickets) 

che sto cercando di fare la stessa cosa nella mia richiesta, ma con Iteratee s e Enumerator s.

I corsi d'acqua e asincronia rendere le cose un po 'complicato, ma qui è quello che ho finora (Nota uploadTicket è definito in precedenza nel codice):

val partNumberStream = Stream.iterate(1)(_ + 1).iterator 
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) => 
    bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map(_ :+ partUploadTicket)) 
} 
(body |>>> partUploadTicketsIteratee).andThen { 
    case result => 
    result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match { 
     case Success(x) => x.map(d => println("Success")) 
     case Failure(t) => throw t 
    } 
} 

Tutto compilato ed eseguito senza incidenti. Infatti, viene stampato "Success", ma nessun file appare mai su S3.

risposta

5

Potrebbero esserci più problemi con il codice. È un po 'illeggibile causato dalle chiamate al metodo map. Potresti avere un problema con la tua composizione futura. Un altro problema potrebbe essere causato dal fatto che tutti i blocchi (tranne l'ultimo) dovrebbero essere almeno 5 MB.

Il codice seguente non è stato testato, ma mostra un approccio diverso. L'approccio iteratee è quello in cui è possibile creare piccoli blocchi di edifici e comporli in una pipa di operazioni.

per rendere il codice di compilazione ho aggiunto un tratto e alcuni metodi

trait BucketFilePartUploadTicket 
val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ??? 
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ??? 
val body: Enumerator[Array[Byte]] = ??? 

Qui creiamo alcune parti

// Create 5MB chunks 
val chunked = { 
    val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) 
    Enumeratee.grouped(take5MB transform Iteratee.consume()) 
} 

// Add a counter, used as part number later on 
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) { 
    case ((counter, _), bytes) => (counter + 1) -> bytes 
} 

// Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket 
val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled) 

// Construct the pipe to connect to the enumerator 
// the ><> operator is an alias for compose, it is more intuitive because of 
// it's arrow like structure 
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets 

// Create a consumer that ends by finishing the upload 
val consumeAndComplete = 
    Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload 

corso è fatto semplicemente collegando le parti

// This is the result, a Future[Unit] 
val result = body through pipe run consumeAndComplete 

Nota che non ho testato alcun codice e che potrebbero aver commesso degli errori nel mio approccio. Questo tuttavia mostra un modo diverso di affrontare il problema e probabilmente dovrebbe aiutarti a trovare una buona soluzione.

Si noti che questo approccio attende che una parte completi il ​​caricamento prima di eseguire la parte successiva. Se la connessione dal server a Amazon è più lenta della connessione dal browser al server, questo meccanismo rallenterà l'input.

È possibile adottare un altro approccio in cui non si attende il completamento dello Future del caricamento della parte. Ciò comporterebbe un altro passaggio in cui si utilizza Future.sequence per convertire la sequenza di futuri di caricamento in un singolo futuro contenente una sequenza dei risultati. Il risultato sarebbe un meccanismo che invia una parte ad Amazon non appena si dispone di dati sufficienti.

+0

Ci sono sempre più problemi con il mio codice. Cos'altro è nuovo? Ma è certamente vero che i blocchi non sono 5 MB in tutti i casi, quindi questo è un problema. Comunque, proverò le tue idee e vedrò cosa posso fare. – Vidya

Problemi correlati