我開發了一個netty http服務器,但是當我在方法ChannelInboundHandlerAdapter.channelRead0中寫入響應時,我的響應結果來自另一臺服務器,結果的大小未知,所以它的http響應頭可能具有內容長度或分塊。所以我使用緩衝區,如果它足夠了(讀取完整的數據),無論內容長度或分塊,我使用內容長度,否則我使用chunked。Netty異步寫入響應和大數據未知的大小
如何保存第一個連接的寫通道,然後將它傳遞給第二個處理程序,以便寫入響應。 (我只是直接傳遞CTX寫,但沒有返回)
如何有條件地決定寫入組塊的數據信道或內容長度正常的數據(它似乎沒有工作添加ChunkWriteHandler如果需要塊時channelRead0。
舉一個簡單的代碼,例如:
```的java
EventLoopGroup bossGroup = new NioEventLoopGroup();
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception
{
System.out.println("Start, I accept client");
ChannelPipeline pipeline = ch.pipeline();
// Uncomment the following line if you want HTTPS
// SSLEngine engine =
// SecureChatSslContextFactory.getServerContext().createSSLEngine();
// engine.setUseClientMode(false);
// pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("decoder", new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
// pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
// Remove the following line if you don't want automatic content
// compression.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("deflater", new HttpContentCompressor());
pipeline.addLast("handler", new SimpleChannelInboundHandler<HttpObject>(){
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
{
System.out.println("msg=" + msg);
final ChannelHandlerContext ctxClient2Me = ctx;
// TODO: Implement this method
Bootstrap bs = new Bootstrap();
try{
//bs.resolver(new DnsAddressResolverGroup(NioDatagramChannel.class, DefaultDnsServerAddressStreamProvider.INSTANCE));
//.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
bs.resolver(DefaultAddressResolverGroup.INSTANCE);
}catch(Exception e){
e.printStackTrace();
}
bs.channel(NioSocketChannel.class);
EventLoopGroup cg = workerGroup;//new NioEventLoopGroup();
bs.group(cg).handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception
{
System.out.println("start, server accept me");
// TODO: Implement this method
ch.pipeline().addLast("http-request-encode", new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast("http-res", new SimpleChannelInboundHandler<HttpObject>(){
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
{
// TODO: Implement this method
System.out.println("target = " + msg);
//
if(msg instanceof HttpResponse){
HttpResponse res = (HttpResponse) msg;
HttpUtil.isTransferEncodingChunked(res);
DefaultHttpResponse resClient2Me = new DefaultHttpResponse(HttpVersion.HTTP_1_1, res.getStatus());
//resClient2Me.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
//resClient2Me.headers().set(HttpHeaderNames.CONTENT_LENGTH, "");
ctxClient2Me.write(resClient2Me);
}
if(msg instanceof LastHttpContent){
// now response the request of the client, it wastes x seconds from receiving request to response
ctxClient2Me.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
ctx.close();
}else if(msg instanceof HttpContent){
//ctxClient2Me.write(new DefaultHttpContent(msg)); write chunk by chunk ?
}
}
});
System.out.println("end, server accept me");
}
});
final URI uri = new URI("http://example.com/");
String host = uri.getHost();
ChannelFuture connectFuture= bs.connect(host, 80);
System.out.println("to connect me to server");
connectFuture.addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture cf) throws Exception
{
}
});
ChannelFuture connetedFuture = connectFuture.sync(); // TODO optimize, wait io
System.out.println("connected me to server");
DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
//req.headers().set(HttpHeaderNames.HOST, "");
connetedFuture.channel().writeAndFlush(req);
System.out.println("end of Client2Me channelRead0");
System.out.println("For the seponse of Me2Server, see SimpleChannelInboundHandler.channelRead0");
}
});
System.out.println("end, I accept client");
}
});
System.out.println("========");
ChannelFuture channelFuture = serverBootstrap.bind(2080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
```