我想實現一個「thumbnail generator」作爲微服務。我認爲這樣的事情可能最適合作爲TCP服務器,因此在簡要調查了幾個我選擇Netty的選項之後。爲了使服務儘可能節省內存,我寧願避免將完整的圖像加載到內存中,因此一直在嘗試構建一個「ThumbnailHandler」可以使用管道流來利用Netty分塊閱讀的管道,以便當Netty接收更多字節時,縮略圖生成器可以遍歷更多的流。不幸的是,我對Netty或NIO模式一般不太瞭解,不知道我是否正以最佳方式進行討論,而且即使簡化版本也無法像我期望的那樣工作。用Netty管道流式傳輸
這是我的服務器設置:
public class ThumbnailerServer {
private int port;
public ThumbnailerServer(int port) {
this.port = port;
}
public void run() throws Exception {
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(UdtChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("handler", new ThumbnailerServerHandler());
}
});
// bind and start to accept incoming connections.
b.bind(port).sync().channel().closeFuture().sync();
} finally {
connectGroup.shutdownGracefully();
acceptGroup.shutdownGracefully();
}
}
}
和縮略圖處理程序:
public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class);
private PipedInputStream toThumbnailer = new PipedInputStream();
private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer);
private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(5));
private ListenableFuture<OutputStream> future;
public ThumbnailerServerHandler() throws IOException {
super(ByteBuf.class, true);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer));
future.addListener(() -> {
try {
ctx.writeAndFlush(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, executor);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
this.fromClient.close();
this.toThumbnailer.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
int readableBytes = msg.readableBytes();
msg.readBytes(this.fromClient, readableBytes);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Encountered error during communication", cause);
ctx.close();
}
}
這裏是我的簡化 「thumbnailer的」 直到我得到整個流程的工作:
public class ThumbnailGenerator {
public static OutputStream generate(InputStream toThumbnailer) {
OutputStream stream = new ByteArrayOutputStream();
try {
IOUtils.copy(toThumbnailer, stream);
} catch (IOException e) {
e.printStackTrace();
}
return stream;
}
}
- 是否適合分拆一個異步任務這樣的handlerAdded方法?有沒有更「網」的方式來做到這一點?
- 的IOUtils.copy應該並執行塊(由於在一個管道輸入流的讀),直到有可供讀取數據被爲什麼我卸載成執行人池,因爲我不能在處理程序方框如果我想繼續接收字節。然而,我發現這個從未完成,但它確實取得進展。那是因爲我從來沒有遇到EOF字節(-1)?我怎樣才能使這個流程工作?
- 我在netty中遺漏了一個可以簡化這個過程的構造嗎?我想過把它作爲一個解碼器來實現,它只有在整個流完成時才解碼,但是我會將所有內容都加載到內存中。