2012-03-22 15 views
5

prima, mi permetta di spiegare il contesto:inviare più richieste asynchonous su un client Netty

Ho avuto modo di creare un client che invierà molte richieste HTTP per scaricare le immagini. Queste richieste devono essere asincrone perché non appena un'immagine è completata verrà aggiunta a una coda e quindi stampata sullo schermo. Poiché le immagini possono essere grandi e le risposte possono essere suddivise, il mio gestore deve aggregarle in un buffer.

Quindi seguo i codici degli esempi Netty (HTTP spoon example).

Attualmente, ho tre mappe statiche per memorizzare per ciascun canale l'ID del canale e il buffer/blocco booleano/il mio oggetto finale.

private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>(); 
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>(); 
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>(); 

Dopodiché, creo il mio client bootstrap e il contatore per contare il numero di richieste in sospeso. La coda finale e il contatore vengono passati al mio gestore per quando l'immagine di risposta è completa.

final ClientBootstrap bootstrap = new ClientBootstrap(
      new NioClientSocketChannelFactory(
      Executors.newCachedThreadPool(), 
      Executors.newCachedThreadPool())); 
    bootstrap.setOption("keepAlive", true); 
    bootstrap.setOption("tcpNoDelay", true); 
    bootstrap.setOption("reuseAddress", true); 
    bootstrap.setOption("connectTimeoutMillis", 30000); 


    final CountDownLatch latch = new CountDownLatch(downloadList.size()) { 

     @Override 
     public void countDown() { 
      super.countDown(); 
      if (getCount() <= 0) { 
       try { 
        queue.put(END_OF_QUEUE); 
        bootstrap.releaseExternalResources(); 
       } catch (InterruptedException ex) { 
        LOGGER.log(Level.WARNING, ex.getMessage(), ex); 
       } 
      } 
     } 
    }; 
    bootstrap.getPipeline().addLast("codec", new HttpClientCodec()); 
    bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch)); 

Dopo di che ho creare un canale per ogni immagine da scaricare e quando è collegato il canale, verrà creata la richiesta e inviare. L'host e la porta sono già stati estratti prima.

for (final ImagePack pack : downloadList) { 

     final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); 

     future.addListener(new ChannelFutureListener() { 

      public void operationComplete(ChannelFuture cf) throws Exception { 

       final Channel channel = future.getChannel(); 

       PACK_MAP.put(channel.getId(), pack); 

       final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url); 
       request.setHeader(HttpHeaders.Names.HOST, host); 
       request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); 
       request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES); 

       if (channel.isWritable()) { 
        channel.write(request); 
       } 
      } 
     }); 
    } 

Ora, questo è il mio ChannelHandler, che è una classe interna che si estendono SimpleChannelUpstreamHandler. Quando il canale è collegato, viene creata una nuova voce in BUFFER_MAP e in CHUNKS_MAP. Lo BUFFER_MAP contiene tutti i buffer di immagini utilizzati dal gestore per aggregare blocchi di immagini dai canali e CHUNKS_MAP contiene una risposta booleana di tipo chunking. Quando la risposta è completa, l'immagine InputSteam viene aggiunta alla coda, il conto alla rovescia e il canale chiuso.

private class TileClientHandler extends SimpleChannelUpstreamHandler { 

    private CancellableQueue<Object> queue; 
    private CountDownLatch latch; 

    public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) { 
     this.queue = queue; 
     this.latch = latch; 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     if(!BUFFER_MAP.contains(ctx.getChannel().getId())){ 
      BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000)); 
     } 
     if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){ 
      CHUNKS_MAP.put(ctx.getChannel().getId(), false); 
     } 
    } 

    @Override 
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { 
     super.writeComplete(ctx, e); 
     if(!BUFFER_MAP.contains(ctx.getChannel().getId())){ 
      BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000)); 
     } 
     if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){ 
      CHUNKS_MAP.put(ctx.getChannel().getId(), false); 
     } 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     final Integer channelID = ctx.getChannel().getId(); 
     if (!CHUNKS_MAP.get(channelID)) { 
      final HttpResponse response = (HttpResponse) e.getMessage(); 

      if (response.isChunked()) { 
       CHUNKS_MAP.put(channelID, true); 

      } else { 
       final ChannelBuffer content = response.getContent(); 
       if (content.readable()) { 
        final ChannelBuffer buf = BUFFER_MAP.get(channelID); 
        buf.writeBytes(content); 
        BUFFER_MAP.put(channelID, buf); 
        messageCompleted(e); 

       } 
      } 
     } else { 
      final HttpChunk chunk = (HttpChunk) e.getMessage(); 
      if (chunk.isLast()) { 
       CHUNKS_MAP.put(channelID, false); 
       messageCompleted(e); 
      } else { 
       final ChannelBuffer buf = BUFFER_MAP.get(channelID); 
       buf.writeBytes(chunk.getContent()); 
       BUFFER_MAP.put(channelID, buf); 
      } 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     e.getCause().printStackTrace(); 
     latch.countDown(); 
     e.getChannel().close(); 
    } 

    private void messageCompleted(MessageEvent e) { 
     final Integer channelID = e.getChannel().getId(); 
     if (queue.isCancelled()) { 
      return; 
     } 

     try { 
      final ImagePack p = PACK_MAP.get(channelID); 
      final ChannelBuffer b = BUFFER_MAP.get(channelID); 

      p.setBuffer(new ByteArrayInputStream(b.array())); 
      queue.put(p.getTile()); 
     } catch (Exception ex) { 
      LOGGER.log(Level.WARNING, ex.getMessage(), ex); 
     } 
     latch.countDown(); 
     e.getChannel().close(); 
    } 
} 

Il mio problema è che quando eseguo questo codice, ho queste eccezioni:

java.lang.IllegalArgumentException: invalid version format: 3!}@ 
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108) 
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68) 
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

java.lang.IllegalArgumentException: invalid version format: 
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108) 
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68) 
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360) 
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101) 
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82) 
    at org.jboss.netty.channel.Channels.close(Channels.java:720) 
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline 
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format: 
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread. 
    at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171) 
    at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324) 
    at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360) 
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101) 
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82) 
    at org.jboss.netty.channel.Channels.close(Channels.java:720) 
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

E anche qualche NPE appare alcune volte.

java.lang.NullPointerException 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 

Tutti questi codici funzionano bene per una richiesta, ma alcune cose strane si aggiungono ai buffer quando vengono richieste molte richieste.

Qualche idea di cosa mi manca qui? Grazie.

Nella mia prima versione, duplica il bootstrap/handler per ogni immagine richiesta, funziona bene ma non è molto ottimizzato.

risposta

5

Il problema è che si condivide un singolo HttpClientCodec tra tutti i canali. La pipeline predefinita specificata nel bootstrap è clonata per tutti i canali, quindi ogni canale vede la stessa istanza di ogni gestore. I codec http sono stateful quindi stai vedendo gli effetti di diverse risposte mescolate insieme.

La soluzione più semplice è passare un ChannelPipelineFactory al bootstrap. Questo sarà chiamato per ogni nuovo canale ed è possibile creare una pipeline con nuove istanze di HttpClientCodec. Non c'è niente che ti impedisca di utilizzare la stessa istanza di TileClientHandler per ogni pipeline che crei se è così che si intende lavorare.

Sono curioso però.Dato che stai facendo ogni richiesta contemporaneamente, non sarebbe più semplice aggiungere HttpChunkAggregator a monte di HttpClientCodec e lasciare che Netty aggreghi tutti i blocchi in un singolo HttpResponse. Quindi prendi il contenuto riassemblato da lì?

+0

Ciao johnstlr, grazie per questa rapida e utile risposta, ora utilizzo un ChannelPipelineFactory per istanziare l'HTTPCodec dai miei gestori di Tile. Funziona bene, ma ho ancora 'java.lang.IllegalStateException: un Executor non può essere spento dal thread acquisito da se stesso. Assicurati di non chiamare releaseExternalResources() da un'eccezione di thread I/O worker !. Hai avuto un'idea per questo? E per le informazioni, la ragione per cui non ho usato un HttpChunkAggregator è che devi impostare una dimensione del buffer per il costruttore HttpChunkAggregator. – qboileau

+1

Stai chiamando bootstrap.releaseExternalResources da CountDownLatch.countDown, che viene chiamato da un thread IO nei metodi del gestore. Sfortunatamente non puoi farlo. È necessario chiamare releaseExternalResources da un thread che non è in un pool di thread utilizzato da Netty. Un'opzione potrebbe essere quella di chiamare releaseExternalResources nel thread che sta leggendo dalla coda interna una volta terminata l'elaborazione della coda. Inoltre, hai completamente ragione su HttpChunkAggregator. Scusate! – johnstlr

Problemi correlati