2015-10-03 25 views
6

Sto tentando di implementare un semplice caricamento di file utilizzando akka http. Il mio tentativo appare come segue:Akka-HTTP: caricamento file

import akka.actor.ActorSystem 
    import akka.event.{LoggingAdapter, Logging} 
    import akka.http.scaladsl.Http 
    import akka.http.scaladsl.model.{HttpResponse, HttpRequest} 
    import akka.http.scaladsl.model.StatusCodes._ 
    import akka.http.scaladsl.server.Directives._ 
    import akka.stream.{ActorMaterializer, Materializer} 
    import com.typesafe.config.Config 
    import com.typesafe.config.ConfigFactory 
    import scala.concurrent.{ExecutionContextExecutor, Future} 
    import akka.http.scaladsl.model.StatusCodes 
    import akka.http.scaladsl.model.HttpEntity 
    import java.io._ 
    import akka.stream.io._ 

    object UploadTest extends App { 
     implicit val system = ActorSystem() 
     implicit val executor = system.dispatcher 
     implicit val materializer = ActorMaterializer() 

     val config = ConfigFactory.load() 
     val logger = Logging(system, getClass) 

     val routes = { 
     pathSingleSlash { 
      (post & extractRequest) { 
      request => { 
       val source = request.entity.dataBytes 
       val outFile = new File("/tmp/outfile.dat") 
       val sink = SynchronousFileSink.create(outFile) 
       source.to(sink).run() 
       complete(HttpResponse(status = StatusCodes.OK)) 
      } 
      } 
     } 
     } 

     Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port")) 

    } 

Ci sono diversi problemi con questo codice:

  1. file più grandi della dimensione dell'entità configurato non può essere caricato: Request Content-Length 24090745 exceeds the configured limit of 8388608
  2. esecuzione di due arrivi di fila causare un'eccezione dead letters encountered..

Qual è il modo migliore per superare i limiti di dimensioni e in che modo posso chiudere correttamente il file in modo che un caricamento successivo sovrascriva il file esistente (ignorando i caricamenti simultanei per il momento)?

risposta

8

Per il punto 2, penso che source.to(sink).run() esegua l'operazione in modo asincrono. Si materializza un Future. Pertanto la tua richiesta HTTP può tornare prima che la scrittura del file sia completata, quindi se inizi un secondo caricamento sul client non appena la prima richiesta ritorna, il primo potrebbe non aver finito di scrivere sul file.

è possibile utilizzare la direttiva onComplete o onSuccess per completare solo la richiesta HTTP in cui il futuro viene completata:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/http/directives/alphabetically.html

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/routing-dsl/directives/future-directives/onSuccess.html

EDIT:

Per il problema lunghezza del contenuto, uno cosa che puoi fare è aumentare la dimensione di quella proprietà in application.conf. Il valore predefinito è:

akka.server.parsing.max-content-length = 8m 

Vedi http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/http/configuration.html

+0

Grazie, con onSuccess risolve il problema "lettere morte incontrate". –

+0

Ho aggiunto una modifica con un'idea per l'altro problema – mattinbits

+0

Ciao, Grazie, ho appreso che impostare la lunghezza massima del contenuto su un valore maggiore non significa che l'intero contenuto sia memorizzato nella memoria. Quindi questo è un modo sicuro per risolvere questo. Avrei upvoted la tua risposta ma non ho abbastanza crediti :-( –

5

Riassumendo mattinbits commenti, la soluzione follwing funziona:

  1. Aumentare akka.server.parsing.max-content-length
  2. Utilizzando onSuccess

Ecco un frammento di il codice:

val routes = { 
    pathSingleSlash { 
    (post & extractRequest) { 
     request => { 
     val source = request.entity.dataBytes 
     val outFile = new File("/tmp/outfile.dat") 
     val sink = SynchronousFileSink.create(outFile) 
     val repl = source.runWith(sink).map(x => s"Finished uploading ${x} bytes!") 
     onSuccess(repl) { repl => 
      complete(HttpResponse(status = StatusCodes.OK, entity = repl)) 
     } 
     } 
    } 
    }