2
我嘗試創建一個代理,然後在Netty中傳遞所有其他的處理程序。我知道我應該管理對ByteBuf的引用,但我無法理解如何去做。我的例子和例外如下。io.netty.util.IllegalReferenceCountException:refCnt:0 in Netty
初始化器:
public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
private final String remoteHost;
private final int remotePort;
public HexDumpProxyInitializer(SslContext sslCtx, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.sslCtx = sslCtx;
}
public HexDumpProxyInitializer(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.sslCtx = null;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort));
p.addLast(new InboundPrinterHandler());
}
}
HexDumpProxyFrontendHandler
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
ctx.fireChannelRead(msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}
InboundPrinterHandler
public class InboundPrinterHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf bb = null;
bb = (ByteBuf) msg;
System.out.println("INBOUND:\n\n"+bb.toString(Charset.defaultCharset()));
System.out.println("\n\n\n");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
}
異常
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
at io.netty.buffer.PooledUnsafeDirectByteBuf.internalNioBuffer(PooledUnsafeDirectByteBuf.java:331)
at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:614)
at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1213)
at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1208)
at com.netas.sv.proxy.InboundPrinterHandler.channelRead(InboundPrinterHandler.java:16)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at com.netas.sv.proxy.HexDumpProxyFrontendHandler.channelRead(HexDumpProxyFrontendHandler.java:67)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)