2016-01-15 10 views
7

Ho un servizio (chiamiamolo Servizio A) che utilizza Akka Server HTTP per gestire le richieste in arrivo. Inoltre ho un'applicazione di terze parti (servizio B) che fornisce diversi servizi web. Lo scopo del servizio A è trasformare le richieste dei client, chiamare uno o più servizi Web del servizio B, unire/trasformare i risultati e restituirli a un cliente.Come chiamare correttamente un singolo server da più attori/gestori web usando Akka HTTP?

Sto utilizzando gli attori per alcune parti e solo il futuro per gli altri. Per effettuare una chiamata al servizio B, utilizzo il client HTTP Akka.

Http.get(actorSystem).singleRequest(HttpRequest.create() 
     .withUri("http://127.0.0.1:8082/test"), materializer) 
     .onComplete(...) 

Il problema è, un nuovo flusso viene creato per ogni servizio A richiesta, e se ci sono più connessioni simultanee, si traduce in akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

ho già fatto questa domanda e ottenuto un suggerimento di utilizzare un singolo flusso How to properly call Akka HTTP client for multiple (10k - 100k) requests?

Mentre funziona per un gruppo di richieste provenienti da un singolo luogo, non so come utilizzare un singolo flusso da tutti i miei gestori simultanei di richieste.

Qual è la corretta "via Akka" per farlo?

risposta

12

Penso che si potrebbe usare Source.queue per tamponare le vostre richieste. Il codice seguente presuppone che è necessario ottenere la risposta dal servizio di terze parti, quindi avere un Future[HttpResponse] è molto gradito. In questo modo è anche possibile fornire una strategia di overflow per prevenire la fame di risorse.

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{HttpRequest, HttpResponse} 
import akka.stream.scaladsl.{Keep, Sink, Source} 
import akka.stream.{ActorMaterializer, OverflowStrategy} 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future, Promise} 
import scala.util.{Failure, Success} 

import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem("main") 
implicit val materializer = ActorMaterializer() 
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80) 
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew) 
    .via(pool) 
    .toMat(Sink.foreach({ 
    case ((Success(resp), p)) => p.success(resp) 
    case ((Failure(e), p)) => p.failure(e) 
    }))(Keep.left) 
    .run 
val promise = Promise[HttpResponse] 
val request = HttpRequest(uri = "/") -> promise 

val response = queue.offer(request).flatMap(buffered => { 
    if (buffered) promise.future 
    else Future.failed(new RuntimeException()) 
}) 

Await.ready(response, 3 seconds) 

(codice copiato dal mio blog post)

+0

Bello, grazie! In realtà ho contattato gli sviluppatori sul gitter e ho anche ricevuto un suggerimento per utilizzare Source.queue e promesse. Ma non avevo ancora il tempo di provarlo. Con il tuo codice sarà più facile! – relgames

+0

@khiramatsu hai un esempio in Java? – Tiago

+0

@khiramatsu Questa coda può essere offerta da più thread? – Vigneshwaran

0

Tutto ciò che dovete fare è impostare un HostConnectionPool su Servizio B all'interno del vostro codice A di servizio. Questo ti darà un Flow che può essere aggiunto al tuo Servizio. Flussi per inviare richieste da A a B usando un pool di connessioni invece di una nuova connessione per flusso. Dalla documentazione:

A differenza di Connection livello API lato client API a livello di host si solleva da gestire manualmente i singoli connessioni HTTP. Lo gestisce autonomamente un pool configurabile di connessioni a un particolare endpoint di destinazione (cioè combinazione host/porta).

Ogni materializzazione di tale flusso, in diversi flussi, trarrà da questo pool sottostante di connessioni:

Il modo migliore per ottenere una sospensione di un pool di connessioni ad un dato bersaglio endpoint è il Metodo Http.get(system).cachedHostConnectionPool(...), che restituisce un valore Flow che può essere "cotto" in una configurazione dello stream a livello di applicazione . Questo flusso è anche chiamato "flusso del cliente del pool".

+0

No, non ha funzionato, ho akka.stream.OverflowStrategy $ Fail $ BufferOverflowException: Superato configurato valore max-aperto-richieste di [4]. Ecco il mio codice https: //gist.github.com/relgames/0c2005bae42922ab2da3 – relgames

+0

@relgames Penso che tu debba refactoring il tuo codice un po '. Non ci dovrebbe essere un Source.single all'interno di un ciclo for. Invece, il ciclo for dovrebbe essere parte del flusso. Come esempio in scala: 'Source (da 1 a 100) .map (i => (HttpRequest.create ("/test "), i) .via (flusso)' ... Ciò impedisce la materializzazione di 100 flussi, e utilizza un singolo flusso. –

+1

Il servizio A gestisce le richieste HTTP utilizzando il DSL HTTP. Un gestore chiama quindi il servizio B utilizzando l'API client HTTP e inoltra i risultati a un richiedente. Quindi nel programma reale non esiste un ciclo for, ma multiplo richieste provenienti da client esterni Stavo solo provando a imitarla usando un ciclo for.La mia domanda iniziale è esattamente su questo: come faccio a usare il singolo flusso (e il materializzatore singolo?) da più thread/servizi distribuiti nel tempo. – relgames

3

Ecco versione Java del accepted answer

final Flow< 
    Pair<HttpRequest, Promise<HttpResponse>>, 
    Pair<Try<HttpResponse>, Promise<HttpResponse>>, 
    NotUsed> flow = 
    Http.get(actorSystem).superPool(materializer); 

final SourceQueue<Pair<HttpRequest, Promise<HttpResponse>>> queue = Source.<Pair<HttpRequest, Promise<HttpResponse>>> 
    queue(BUFFER_SIZE, OverflowStrategy.dropNew()) 
    .via(flow) 
     .toMat(Sink.foreach(p -> p.second().complete(p.first())), Keep.left()) 
     .run(materializer); 

... 

public CompletionStage<HttpResponse> request(HttpRequest request) { 
    log.debug("Making request {}", request); 

    Promise<HttpResponse> promise = Futures.promise(); 
    return queue.offer(Pair.create(request, promise)) 
     .thenCompose(buffered -> { 
      if (buffered instanceof QueueOfferResult.Enqueued$) { 
       return FutureConverters.toJava(promise.future()) 
        .thenApply(resp -> { 
         if (log.isDebugEnabled()) { 
          log.debug("Got response {} {}", resp.status(), resp.getHeaders()); 
         } 
         return resp; 
        }); 
      } else { 
       log.error("Could not buffer request {}", request); 
       return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE)); 
      } 
     }); 
} 
Problemi correlati