2015-07-21 10 views
8

Sto cercando di capire come utilizzare la nuova libreria akka.http. Vorrei inviare una richiesta http a un server e leggere l'intero corpo della risposta come una singola stringa per produrre un Source[String,?].Ottieni l'intero corpo HttpResponse come una stringa con Akka-Streams HTTP

Ecco la soluzione migliore sono stato in grado di produrre fino ad ora:

def get(
    modelID: String, 
    pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] 
): Source[String,Unit] = { 
    val uri = reactionsURL(modelID) 
    val req = HttpRequest(uri = uri) 
    Source.single((req,0)) 
    .via(pool) 
    .map { 
     case (Success(resp),_) => 
     resp.entity.dataBytes.map(_.decodeString("utf-8")) 
    }.flatten(FlattenStrategy.concat) 
    .grouped(1024) 
    .map(_.mkString) 

Sembra funzionare bene (tranne il percorso di errore mancante), ma è un po 'goffo per tali compiti semplici. C'è una soluzione più intelligente? Posso evitare lo grouped/mkString?

risposta

11

È possibile utilizzare il metodo toStrict di HttpResponse con timeout. Raccoglie una risposta completa come futuro.

def toStrict (timeout: FiniteDuration) (implicito CE: ExecutionContext, fm: Materializer): Future [rigoroso] Restituisce un

copia condivisibile e serializzabile di questo messaggio con un'entità rigorosa.

Esempio:

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{HttpResponse, HttpRequest} 
import akka.stream.{Materializer, ActorMaterializer} 
import akka.stream.scaladsl.{Sink, Flow, Source} 
import scala.concurrent.{ExecutionContext, Future} 
import scala.concurrent.duration._ 

import scala.util.{Try, Success} 

object Main extends App { 

    implicit val system = ActorSystem() 

    import system.dispatcher 

    implicit val materializer = ActorMaterializer() 

    val host = "127.0.0.1" 
    lazy val pool = Http().newHostConnectionPool[Int](host, 9000) 

    FlowBuilder.get("/path", pool).to(Sink.foreach(_.foreach(println))).run() 

} 

object FlowBuilder { 
    def get(modelID: String, pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool]) 
     (implicit ec: ExecutionContext, mat: Materializer): Source[Future[String], Unit] = { 
    val uri = modelID 
    val req = HttpRequest(uri = modelID) 
    Source.single((req, 0)).via(pool) 
     .map { 
     case (Success(resp), _) => resp.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8")) 
    } 
    } 
} 
+0

OK. Ma ho bisogno di un 'Source [String, Unit] perché ho ulteriori trasformazioni da applicare. Con la tua soluzione, avrò bisogno di passare lungo Materializer ed ExecuctionContext e poi riavvolgere eveything in una nuova Sorgente ... Non è più clunkier o mi sono perso qualcosa? – paradigmatic

+0

@paradigmatic Aggiungo un esempio. ExecutionContext e materializer potrebbero essere impostati come argomenti impliciti. – Zernike

7

È possibile utilizzare Unmarshall che funziona anche su altri tipi per esempio JSON da spray-json. Anche questo come strict restituisce Future[_].

Esempio:

authedReq.via(authServerReqResFlow).mapAsync(1) { case (tryRes, _) => 
     tryRes match { 
     case Failure(exception) => Future.failed[Principal](exception) 
     case Success(response @ HttpResponse(StatusCodes.OK,_,_,_)) => 
      val userContext = Unmarshal(response).to[UserContextData] 
      userContext.map { 
      case UserContextData(UserInfo(_, userName, fullName, email, title), _, _) => 
       Principal(userName, fullName, email, title) 
      } 
     case Success(response @ HttpResponse(responseCode,_,entity,_)) => 
      Unmarshal(entity).to[String].flatMap(msg => Future.failed(new AuthenticationFailure(s"$responseCode\n$msg"))) 
     } 
    } 
Problemi correlati