2017-01-09 63 views
2

我嘗試創建一個代理,然後在Netty中傳遞所有其他的處理程序。我知道我應該管理對ByteBuf的引用,但我無法理解如何去做。我的例子和例外如下。io.netty.util.IllegalReferenceCountException:refCnt:0 in Netty

初始化器:

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> { 

    private final SslContext sslCtx; 

    private final String remoteHost; 

    private final int remotePort; 

    public HexDumpProxyInitializer(SslContext sslCtx, String remoteHost, int remotePort) { 
     this.remoteHost = remoteHost; 
     this.remotePort = remotePort; 
     this.sslCtx = sslCtx; 
    } 

    public HexDumpProxyInitializer(String remoteHost, int remotePort) { 
     this.remoteHost = remoteHost; 
     this.remotePort = remotePort; 
     this.sslCtx = null; 
    } 

    @Override 
    public void initChannel(SocketChannel ch) { 
     ChannelPipeline p = ch.pipeline(); 
     if (sslCtx != null) { 
      p.addLast(sslCtx.newHandler(ch.alloc())); 
     } 

     p.addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort)); 
     p.addLast(new InboundPrinterHandler()); 
    } 
} 

HexDumpProxyFrontendHandler

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter { 

    private final String remoteHost; 
    private final int remotePort; 

    private Channel outboundChannel; 

    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) { 
     this.remoteHost = remoteHost; 
     this.remotePort = remotePort; 
    } 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     final Channel inboundChannel = ctx.channel(); 

     // Start the connection attempt. 
     Bootstrap b = new Bootstrap(); 
     b.group(inboundChannel.eventLoop()) 
     .channel(ctx.channel().getClass()) 
     .handler(new HexDumpProxyBackendHandler(inboundChannel)) 
     .option(ChannelOption.AUTO_READ, false); 
     ChannelFuture f = b.connect(remoteHost, remotePort); 
     outboundChannel = f.channel(); 
     f.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) { 
       if (future.isSuccess()) { 
        // connection complete start to read first data 
        inboundChannel.read(); 
       } else { 
        // Close the connection if the connection attempt has failed. 
        inboundChannel.close(); 
       } 
      } 
     }); 
    } 

    @Override 
    public void channelRead(final ChannelHandlerContext ctx, Object msg) { 
     if (outboundChannel.isActive()) { 
      outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { 
       @Override 
       public void operationComplete(ChannelFuture future) { 
        if (future.isSuccess()) { 
         // was able to flush out data, start to read the next chunk 
         ctx.channel().read(); 
        } else { 
         future.channel().close(); 
        } 
       } 
      }); 
     } 
     ctx.fireChannelRead(msg); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) { 
     if (outboundChannel != null) { 
      closeOnFlush(outboundChannel); 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     cause.printStackTrace(); 
     closeOnFlush(ctx.channel()); 
    } 

    static void closeOnFlush(Channel ch) { 
     if (ch.isActive()) { 
      ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); 
     } 
    } 
} 

InboundPrinterHandler

public class InboundPrinterHandler extends ChannelInboundHandlerAdapter { 


    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     ByteBuf bb = null; 
     bb = (ByteBuf) msg; 
     System.out.println("INBOUND:\n\n"+bb.toString(Charset.defaultCharset())); 
     System.out.println("\n\n\n"); 
    } 


    @Override 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 

    } 


} 

異常

io.netty.util.IllegalReferenceCountException: refCnt: 0 
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407) 
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353) 
    at io.netty.buffer.PooledUnsafeDirectByteBuf.internalNioBuffer(PooledUnsafeDirectByteBuf.java:331) 
    at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:614) 
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1213) 
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1208) 
    at com.netas.sv.proxy.InboundPrinterHandler.channelRead(InboundPrinterHandler.java:16) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) 
    at com.netas.sv.proxy.HexDumpProxyFrontendHandler.channelRead(HexDumpProxyFrontendHandler.java:67) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
    at java.lang.Thread.run(Thread.java:745) 

回答

5
if (outboundChannel.isActive()) { 
     outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { 
     // Snip 
    } 
    ctx.fireChannelRead(msg); 

後您的ByteBuf廢去給另一個信道,這是其他渠道的責任,再次遞減引用計數。因爲另一個頻道現在已經減少了引用次數,所以現在它是不可用的。

解決這個手動的值遞增您使用.retain()傳遞流量到其他通道之前,最好的辦法:

outboundChannel.writeAndFlush(msg.retain()).addListener(new ChannelFutureListener() { 
// Your remaining code