2015-12-15 8 views
11

Sto provando a scrivere uno strumento per il caricamento di dati in batch usando Akka HTTP 2.0-M2. Ma io sto affrontando akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.Come chiamare correttamente il client HTTP Akka per più richieste (10k - 100k)?

Ho cercato di isolare un problema e qui è il codice di esempio che non riesce anche:

public class TestMaxRequests { 
    private static final class Router extends HttpApp { 
     @Override 
     public Route createRoute() { 
      return route(
        path("test").route(
          get(handleWith(ctx -> ctx.complete("OK"))) 
        ) 
      ); 
     } 
    } 


    public static void main(String[] args) { 
     ActorSystem actorSystem = ActorSystem.create(); 
     Materializer materializer = ActorMaterializer.create(actorSystem); 

     Router router = new Router(); 
     router.bindRoute("127.0.0.1", 8082, actorSystem); 

     LoggingAdapter log = Logging.getLogger(actorSystem, new Object()); 

     for (int i = 0; i < 100; i++) { 
      final int reqNum = i; 
      Http.get(actorSystem).singleRequest(HttpRequest.create().withUri("http://127.0.0.1:8082/test"), materializer) 
        .onComplete(new OnComplete<HttpResponse>() { 
         @Override 
         public void onComplete(Throwable failure, HttpResponse response) throws Throwable { 
          if (failure != null) { 
           log.error(failure, "Failed: {}", reqNum); 
          } else { 
           log.info("Success: {}, consuming stream...", reqNum); 
           response.entity().getDataBytes().runWith(Sink.ignore(), materializer); 
           log.info("Success: {}, consumed stream", reqNum); 
          } 
         } 
        }, actorSystem.dispatcher()); 
     } 
    } 
} 

non riesce con:

[2015-12-15 16:17:32,609] [ INFO] [] [] a.e.s.Slf4jLogger: Slf4jLogger started 
[2015-12-15 16:17:32,628] [ DEBUG] [main] [EventStream(akka://default)] a.e.EventStream: logger log1-Slf4jLogger started 
[2015-12-15 16:17:32,636] [ DEBUG] [main] [EventStream(akka://default)] a.e.EventStream: Default Loggers started 
[2015-12-15 16:17:33,531] [ DEBUG] [spatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] a.i.TcpListener: Successfully bound to /127.0.0.1:8082 
[2015-12-15 16:17:33,624] [ DEBUG] [spatcher-7] [akka://default/user/PoolInterfaceActor-0] a.h.i.e.c.PoolInterfaceActor: (Re-)starting host connection pool to 127.0.0.1:8082 
[2015-12-15 16:17:33,736] [ DEBUG] [spatcher-8] [akka://default/user/SlotProcessor-0] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,748] [ DEBUG] [patcher-11] [akka://default/user/SlotProcessor-3] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,758] [ DEBUG] [spatcher-9] [akka://default/user/SlotProcessor-2] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,762] [ DEBUG] [spatcher-9] [akka://default/user/SlotProcessor-1] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,779] [ ERROR] [patcher-11] [Object(akka://default)] j.l.Object: Failed: 36 
akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] 
    at akka.http.impl.engine.client.PoolInterfaceActor$$anonfun$receive$1.applyOrElse(PoolInterfaceActor.scala:120) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[akka-actor_2.11-2.4.0.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorSubscriber$$super$aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.stream.actor.ActorSubscriber$class.aroundReceive(ActorSubscriber.scala:201) ~[akka-stream-experimental_2.11-2.0-M2.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorPublisher$$super$aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:309) ~[akka-stream-experimental_2.11-2.0-M2.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.actor.ActorCell.invoke(ActorCell.scala:494) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [akka-actor_2.11-2.4.0.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na] 
[2015-12-15 16:17:33,780] [ ERROR] [patcher-20] [Object(akka://default)] j.l.Object: Failed: 48 

Credo che sia perché io Sto cercando di creare molti Futures e li eseguo tutti in una volta. Ma Akka non dovrebbe consentire la contropressione? Credo che lo stia usando male. Ho provato i metodi superPool ma nulla è cambiato perché, come ho capito, lo Http.singleRequest ha la stessa piscina all'interno. Ho anche provato a riutilizzare l'istanza di Http invece di chiamare Http.get() nel ciclo, ma non ha aiutato.

Qual è il modo corretto di licenziare una serie di richieste? Sto pianificando di eseguire batch di 10.000 - 100.000 richieste.

risposta

13

Akka abilita assolutamente la contropressione, semplicemente non ne approfittate. Invece di inviare più richieste singole, puoi utilizzare un singolo Flow per inviare tutte le tue richieste. Dal documentation:

final Flow<HttpRequest, HttpResponse, Future<OutgoingConnection>> connectionFlow = 
    Http.get(actorSystem).outgoingConnection("127.0.0.1", 8082); 

È quindi possibile utilizzare questo flusso per elaborare i HttpRequest oggetti:

HttpRequest req = HttpRequest.GET("/test") 

//imitates your for-loop example of 100 requests 
Source.from(() -> Collections.nCopies(100, req).iterator()) 
     .via(connectionFlow) 
     .runForeach(...) 
+0

ho perso la parte di utilizzo di più richieste in una Source.from()! Grazie!! – relgames

+0

@relgames Sei il benvenuto. Happy hacking! –

+0

@RamonJRomeroyVigil Come posso utilizzare il flusso con la contropressione se non riesco a creare un mucchio di richieste in anticipo? Per esempio diciamo che sto richiedendo qualcosa in base agli ID restituiti in qualche API paginata. Quindi voglio usare il flusso mentre elaboro le risposte dalle richieste precedenti. – expert

Problemi correlati