2015-12-30 67 views
3

我想實現一個「thumbnail generator」作爲微服務。我認爲這樣的事情可能最適合作爲TCP服務器,因此在簡要調查了幾個我選擇Netty的選項之後。爲了使服務儘可能節省內存,我寧願避免將完整的圖像加載到內存中,因此一直在嘗試構建一個「ThumbnailHandler」可以使用管道流來利用Netty分塊閱讀的管道,以便當Netty接收更多字節時,縮略圖生成器可以遍歷更多的流。不幸的是,我對Netty或NIO模式一般不太瞭解,不知道我是否正以最佳方式進行討論,而且即使簡化版本也無法像我期望的那樣工作。用Netty管道流式傳輸

這是我的服務器設置:

public class ThumbnailerServer { 

    private int port; 

    public ThumbnailerServer(int port) { 
     this.port = port; 
    } 

    public void run() throws Exception { 
     final ThreadFactory acceptFactory = new DefaultThreadFactory("accept"); 
     final ThreadFactory connectFactory = new DefaultThreadFactory("connect"); 
     final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER); 
     final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER); 

     try { 
      ServerBootstrap b = new ServerBootstrap(); 

      b.group(acceptGroup, connectGroup) 
      .channelFactory(NioUdtProvider.BYTE_ACCEPTOR) 
      .option(ChannelOption.SO_BACKLOG, 128) 
      .handler(new LoggingHandler(LogLevel.INFO)) 
      .childHandler(new ChannelInitializer<UdtChannel>() { 
       @Override 
       public void initChannel(UdtChannel ch) throws Exception { 
        ChannelPipeline p = ch.pipeline(); 
        p.addLast("handler", new ThumbnailerServerHandler()); 
       } 
      }); 

      // bind and start to accept incoming connections. 
      b.bind(port).sync().channel().closeFuture().sync(); 
     } finally { 
      connectGroup.shutdownGracefully(); 
      acceptGroup.shutdownGracefully(); 
     } 
    } 

} 

和縮略圖處理程序:

public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> { 

    private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class); 
    private PipedInputStream toThumbnailer = new PipedInputStream(); 
    private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer); 

    private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
      Executors.newFixedThreadPool(5)); 

    private ListenableFuture<OutputStream> future; 

    public ThumbnailerServerHandler() throws IOException { 
     super(ByteBuf.class, true); 
    } 

    @Override 
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 
     future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer)); 
     future.addListener(() -> { 
      try { 
       ctx.writeAndFlush(future.get()); 
      } catch (InterruptedException | ExecutionException e) { 
       e.printStackTrace(); 
      } 
     }, executor); 
    } 

    @Override 
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 
     this.fromClient.close(); 
     this.toThumbnailer.close(); 
    } 

    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
     int readableBytes = msg.readableBytes(); 
     msg.readBytes(this.fromClient, readableBytes); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
     logger.error("Encountered error during communication", cause); 
     ctx.close(); 
    } 


} 

這裏是我的簡化 「thumbnailer的」 直到我得到整個流程的工作:

public class ThumbnailGenerator { 

    public static OutputStream generate(InputStream toThumbnailer) { 
     OutputStream stream = new ByteArrayOutputStream(); 
     try { 
      IOUtils.copy(toThumbnailer, stream); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
     return stream; 
    } 

} 
  1. 是否適合分拆一個異步任務這樣的handlerAdded方法?有沒有更「網」的方式來做到這一點?
  2. 的IOUtils.copy應該並執行塊(由於在一個管道輸入流的讀),直到有可供讀取數據被爲什麼我卸載成執行人池,因爲我不能在處理程序方框如果我想繼續接收字節。然而,我發現這個從未完成,但它確實取得進展。那是因爲我從來沒有遇到EOF字節(-1)?我怎樣才能使這個流程工作?
  3. 我在netty中遺漏了一個可以簡化這個過程的構造嗎?我想過把它作爲一個解碼器來實現,它只有在整個流完成時才解碼,但是我會將所有內容都加載到內存中。

回答

2

好的,所以事實證明我有一些誤解,這就解釋了爲什麼我無法正常工作。

1)許多文件類型沒有所謂的終端字節。事實上,EOF字節(最常見的是-1,因爲它是一個溢出值)通常是由讀者提供的一種實現,以向消費者傳達他們已經達到內容的結尾。它通常不在文件本身中存在。

2)channelReadComplete不那麼明朗,因爲它的聲音。 channelReadComplete在netty中配置的最大讀取次數(默認爲10)之後被調用,或者當它有理由相信一個消息已被完全發送,如讀取一個空緩衝區或接收到一個小於配置塊大小。

至於爲什麼它似乎是輸入流的副本被掛,以及這是因爲管道輸入流從不產生的終值(這是讀取器 - 實施一個EOF字節的一個例子)。一旦驅動它們的輸出流已經關閉,PipedInputStreams只會指示EOF。

爲了使這一實施工作,我應該調升消息計數到足夠高的數字和信任channelReadComplete到返回小於塊大小的最後讀取後調用。在這一點上,關閉並重置下一條消息的輸出流是安全的。關閉輸出流會導致輸入流最終返回EOF字節和其他所有內容。