2017-08-21 52 views
2

我開發了一個netty http服務器,但是當我在方法ChannelInboundHandlerAdapter.channelRead0中寫入響應時,我的響應結果來自另一臺服務器,結果的大小未知,所以它的http響應頭可能具有內容長度或分塊。所以我使用緩衝區,如果它足夠了(讀取完整的數據),無論內容長度或分塊,我使用內容長度,否則我使用chunked。Netty異步寫入響應和大數據未知的大小

  1. 如何保存第一個連接的寫通道,然後將它傳遞給第二個處理程序,以便寫入響應。 (我只是直接傳遞CTX寫,但沒有返回)

  2. 如何有條件地決定寫入組塊的數據信道或內容長度正常的數據(它似乎沒有工作添加ChunkWriteHandler如果需要塊時channelRead0。

舉一個簡單的代碼,例如:

```的java

EventLoopGroup bossGroup = new NioEventLoopGroup(); 
    final EventLoopGroup workerGroup = new NioEventLoopGroup(); 

    try { 
     ServerBootstrap serverBootstrap = new ServerBootstrap(); 

     serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) 
      .handler(new LoggingHandler(LogLevel.INFO)) 
      .childHandler(new ChannelInitializer<Channel>(){ 

       @Override 
       protected void initChannel(Channel ch) throws Exception 
       { 
        System.out.println("Start, I accept client"); 
        ChannelPipeline pipeline = ch.pipeline(); 

        // Uncomment the following line if you want HTTPS 
        // SSLEngine engine = 
        // SecureChatSslContextFactory.getServerContext().createSSLEngine(); 
        // engine.setUseClientMode(false); 
        // pipeline.addLast("ssl", new SslHandler(engine)); 

        pipeline.addLast("decoder", new HttpRequestDecoder()); 
        // Uncomment the following line if you don't want to handle HttpChunks. 
        // pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); 
        pipeline.addLast("encoder", new HttpResponseEncoder()); 
        // Remove the following line if you don't want automatic content 
        // compression. 
        //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); 
        pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); 
        pipeline.addLast("deflater", new HttpContentCompressor()); 
        pipeline.addLast("handler", new SimpleChannelInboundHandler<HttpObject>(){ 

          @Override 
          protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception 
          { 
           System.out.println("msg=" + msg); 

           final ChannelHandlerContext ctxClient2Me = ctx; 

           // TODO: Implement this method 
           Bootstrap bs = new Bootstrap(); 
           try{ 
           //bs.resolver(new DnsAddressResolverGroup(NioDatagramChannel.class, DefaultDnsServerAddressStreamProvider.INSTANCE)); 
           //.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE) 
           bs.resolver(DefaultAddressResolverGroup.INSTANCE); 
           }catch(Exception e){ 
            e.printStackTrace(); 
           } 

           bs.channel(NioSocketChannel.class); 
           EventLoopGroup cg = workerGroup;//new NioEventLoopGroup(); 
           bs.group(cg).handler(new ChannelInitializer<Channel>(){ 

             @Override 
             protected void initChannel(Channel ch) throws Exception 
             { 
              System.out.println("start, server accept me"); 
              // TODO: Implement this method 
              ch.pipeline().addLast("http-request-encode", new HttpRequestEncoder()); 
              ch.pipeline().addLast(new HttpResponseDecoder()); 
              ch.pipeline().addLast("http-res", new SimpleChannelInboundHandler<HttpObject>(){ 

                @Override 
                protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception 
                { 
                 // TODO: Implement this method 
                 System.out.println("target = " + msg); 
                 // 
                 if(msg instanceof HttpResponse){ 
                  HttpResponse res = (HttpResponse) msg; 
                  HttpUtil.isTransferEncodingChunked(res); 
                  DefaultHttpResponse resClient2Me = new DefaultHttpResponse(HttpVersion.HTTP_1_1, res.getStatus()); 

                  //resClient2Me.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); 
                  //resClient2Me.headers().set(HttpHeaderNames.CONTENT_LENGTH, ""); 

                  ctxClient2Me.write(resClient2Me); 
                 } 
                 if(msg instanceof LastHttpContent){ 
                  // now response the request of the client, it wastes x seconds from receiving request to response 
                  ctxClient2Me.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE); 
                  ctx.close(); 
                 }else if(msg instanceof HttpContent){ 
                  //ctxClient2Me.write(new DefaultHttpContent(msg)); write chunk by chunk ? 
                 } 
                } 


               }); 

              System.out.println("end, server accept me"); 

             } 

           }); 

           final URI uri = new URI("http://example.com/"); 
           String host = uri.getHost(); 
           ChannelFuture connectFuture= bs.connect(host, 80); 

           System.out.println("to connect me to server"); 

           connectFuture.addListener(new ChannelFutureListener(){ 

             @Override 
             public void operationComplete(ChannelFuture cf) throws Exception 
             { 
             } 

           }); 


           ChannelFuture connetedFuture = connectFuture.sync(); // TODO optimize, wait io 
           System.out.println("connected me to server"); 

           DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath()); 
           //req.headers().set(HttpHeaderNames.HOST, ""); 
           connetedFuture.channel().writeAndFlush(req); 

           System.out.println("end of Client2Me channelRead0"); 
           System.out.println("For the seponse of Me2Server, see SimpleChannelInboundHandler.channelRead0"); 
          } 

        }); 
        System.out.println("end, I accept client"); 
       } 

      }); 

      System.out.println("========"); 

     ChannelFuture channelFuture = serverBootstrap.bind(2080).sync(); 
     channelFuture.channel().closeFuture().sync(); 
    } finally { 
     bossGroup.shutdownGracefully(); 
     workerGroup.shutdownGracefully(); 
    } 

```

回答

0
  1. 查看評論有關Channel,這樣你就可以保留在ChannelInboundHandlerAdapter.channelRead(ChannelHandlerContext ctx, Object msg)接收的頻道(MSG自動返回後沒有被釋放)或SimpleChannelInboundHandler.channelRead0(ChannelHandlerContext ctx, I msg)(它返回後自動釋放收到的消息),以備後用。也許你可以參考最後的例子,將頻道傳遞給另一個ChannelHandler

所有的I/O操作是異步的。

Netty中的所有I/O操作都是異步的。這意味着任何I/O調用都將立即返回,不保證在呼叫結束時所請求的I/O操作已完成。相反,您將返回一個ChannelFuture實例,該實例將在請求的I/O操作成功,失敗或取消時通知您。

public interface Channel extends AttributeMap, Comparable<Channel> { 

    /** 
    * Request to write a message via this {@link Channel} through the {@link ChannelPipeline}. 
    * This method will not request to actual flush, so be sure to call {@link #flush()} 
    * once you want to request to flush all pending data to the actual transport. 
    */ 
    ChannelFuture write(Object msg); 

    /** 
    * Request to write a message via this {@link Channel} through the {@link ChannelPipeline}. 
    * This method will not request to actual flush, so be sure to call {@link #flush()} 
    * once you want to request to flush all pending data to the actual transport. 
    */ 
    ChannelFuture write(Object msg, ChannelPromise promise); 

    /** 
    * Request to flush all pending messages. 
    */ 
    Channel flush(); 

    /** 
    * Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}. 
    */ 
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise); 

    /** 
    * Shortcut for call {@link #write(Object)} and {@link #flush()}. 
    */ 
    ChannelFuture writeAndFlush(Object msg); 
} 
  • 沒有必要擔心這個,如果你已經添加HttpResponseEncoder(它是HttpObjectEncoder一個子類,其中有一個私人提起private int state = ST_INIT;記得是否編碼HTTP身體數據分塊)到ChannelPipeline,唯一要做的就是添加一個頭'transfer-encoding:chunked',例如HttpUtil.setTransferEncodingChunked(srcRes, true);
  • ```Java的

    public class NettyToServerChat extends SimpleChannelInboundHandler<HttpObject> { 
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyToServerChat.class); 
        public static final String CHANNEL_NAME = "NettyToServer"; 
    
        protected final ChannelHandlerContext ctxClientToNetty; 
        /** Determines if the response supports keepalive */ 
        private boolean responseKeepalive = true; 
        /** Determines if the response is chunked */ 
        private boolean responseChunked = false; 
    
        public NettyToServerChat(ChannelHandlerContext ctxClientToNetty) { 
        this.ctxClientToNetty = ctxClientToNetty; 
        } 
    
        @Override 
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { 
         if (msg instanceof HttpResponse) { 
         HttpResponse response = (HttpResponse) msg; 
    
         HttpResponseStatus resStatus = response.status(); 
         //LOGGER.info("Status Line: {} {} {}", response.getProtocolVersion(), resStatus.code(), resStatus.reasonPhrase()); 
    
         if (!response.headers().isEmpty()) { 
         for (CharSequence name : response.headers().names()) { 
          for (CharSequence value : response.headers().getAll(name)) { 
          //LOGGER.info("HEADER: {} = {}", name, value); 
          } 
         } 
         //LOGGER.info(""); 
         } 
         //response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); 
    
         HttpResponse srcRes = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); 
         if (HttpUtil.isTransferEncodingChunked(response)) { 
         responseChunked = true; 
         HttpUtil.setTransferEncodingChunked(srcRes, true); 
         ctxNettyToServer.channel().write(srcRes); 
         //ctx.channel().pipeline().addAfter(CHANNEL_NAME, "ChunkedWrite", new ChunkedWriteHandler()); 
         } else { 
         ctxNettyToServer.channel().write(srcRes); 
         //ctx.channel().pipeline().remove("ChunkedWrite"); 
         } 
        } 
    
        if (msg instanceof LastHttpContent) { // prioritize the subclass interface 
         ctx.close(); 
         LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable()); 
         Thread.sleep(3000); 
         LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable()); 
    
         if(!responseChunked){ 
         HttpContent content = (HttpContent) msg; 
         // https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/SimpleChannelInboundHandler.java 
         // @see {@link SimpleChannelInboundHandler<I>#channelRead(ChannelHandlerContext, I)} 
         ctxNettyToServer.writeAndFlush(content.retain()).addListener(ChannelFutureListener.CLOSE); 
         }else{ 
         ctxNettyToServer.close(); 
         } 
         LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable()); 
        } else if (msg instanceof HttpContent) { 
         HttpContent content = (HttpContent) msg; 
         // We need to do a ReferenceCountUtil.retain() on the buffer to increase the reference count by 1 
         ctxNettyToServer.write(content.retain()); 
        } 
        } 
    } 
    

    ```

    0

    有點掙扎試圖從非Netty的發送響應事件循環線程後,我終於想通了這個問題。如果您的客戶端關閉使用

    socketChannel.shutdownOutput() 
    

    ,那麼你需要設置ALLOW_HALF_CLOSURE財產真正Netty中,這樣就不會關閉通道的輸出流。 以下是一個示例服務器。客戶端是作爲練習留給讀者:-)

    final ServerBootstrap b = new ServerBootstrap(); 
    
        b.group(bossGroup, workerGroup) 
          .channel(NioServerSocketChannel.class) 
          .option(ChannelOption.SO_KEEPALIVE, true) 
          .option(ChannelOption.ALLOW_HALF_CLOSURE, true)   // This option doesn't work 
          .handler(new LoggingHandler(LogLevel.INFO)) 
          .childHandler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() { 
           @Override 
           protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception { 
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { 
             @Override 
             public void channelRegistered(ChannelHandlerContext ctx) throws Exception { 
              ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);  // This is important 
             } 
    
             @Override 
             public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
              ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer(); 
              String id = ctx.channel().id().asLongText(); 
    
              // When Done reading all the bytes, send response 1 second later 
              timer.schedule(new TimerTask() { 
               @Override 
               public void run() { 
                ctx.write(Unpooled.copiedBuffer(CONTENT.asReadOnlyBuffer())); 
                ctx.flush(); 
                ctx.close(); 
    
                log.info("[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.get(id)); 
                startTimes.remove(id); 
               } 
              }, 1000); 
             } 
            } 
           } 
          }); 
        Channel ch = b.bind("localhost", PORT).sync().channel(); 
        ch.closeFuture().sync(); 
    

    Ofcourse,如由他人在線程提到的,你不能發送字符串,則需要使用發送ByteBuf Unpooled.copiedBuffer