2016-01-29 43 views
1

我必須爲RPC通信創建TCP/IP服務器。我必須使用提供的java庫來處理所有「rpc」的東西。這個庫接收到包含protobuf數據的HDLC消息。 lib本身使用請求和響應處理程序來處理HDLC和protobuf部分。這個庫可以用於串行連接以及網絡連接。如何將接收到的數據轉換爲輸入流和輸出流

我們希望爲此使用netty作爲TCP服務器。在調用這個庫時,它需要在「RPC」方法中使用java.io.inputstreamjava.io.outputstream

我有一個簡單的阻塞設置,我在其中創建一個服務器套接字,並將socket.getInputStream()socket.getOutputStream()傳遞給RPC方法。接下來,我需要向此rpc對象註冊(設置)rpc處理程序,並且客戶端可以連接併發送數據。對我來說似乎很簡單。

我也設置了netty「echo」服務器,現在我想用netty使用這個RPC庫。我正在努力的是如何將我收到的數據轉換爲所需的InputStream以及如何轉換RPC lib的OutputStream,以便它可以發送回客戶端。我需要一個解碼器/編碼器,還是有更簡單的方法來做到這一點?如果是這樣,我如何將ByteBuf轉換爲InputStream,並將OutputStream轉換回可以通過網絡發送的格式?

回答

1

的InputStream

其他API有readpacket()方法

如果庫有readPacket方法,你可以結合一個ReplayingDecoder使用ByteBufInputStream,這是比較容易實現:

public class RPCInputHandler extends ReplayingDecoder<Object> { 
    RPC upstream = ....; 
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { 
     upstream.readPacket(new ByteBufInputStream(buf)); 
     state(null); 
    } 
} 

遠程API使用線程來讀取流

如果您的上游庫使用單獨的線程處理傳入的消息,您將失去Netty的主要優點之一:對於大量連接而言,線程數較少。

public class RPCInputHandler extends SimpleChannelInboundHandler<ByteBuf> { 
    RPC upstream = ....; 
    PipedInputStream in; 
    PipedOutputStream out; 

    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { 
     in = new PipedInputStream(); 
     out = new PipedOutputStream(in); 
     upstream.startInput(in); 
    } 

    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
     out.close(); // This sends EOF to the other pipe 
    } 

    // This method is called messageReceived(ChannelHandlerContext, I) in 5.0. 
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
     byte[] data = new byte[msg.readableBytes()]; 
     msg.readBytes(data); 
     out.write(data); 
    } 
} 

的OutputStream

製作寫入字節到我們的連接定製的OutputStream簡單,最直接的方法映射在

public class OutputStreamNetty extends OutputStream { 
    final Channel channel = ...; 
    ChannelFuture lastFuture; 

    private void checkFuture() throws IOException { 
     if(lastFuture.isDone()) { 
      if(lastFuture.cause() != null) { 
       throw new IOException("Downstream write problem", lastFuture.cause()); 
      } 
      lastFuture = null; 
     } 
    } 

    private void addFuture(ChannelFuture f) { 
     if(lastFuture == null) { 
      lastFuture = f; 
     } 
    } 

    public void close() throws IOException { 
     checkFuture() 
     addFuture(channel.close()); 
    } 
    public void flush() throws IOException { 
     checkFuture() 
     addFuture(channel.flush()); 
    } 
    public void write(byte[] b, int off, int len) throws IOException { 
     checkFuture() 
     Bytebuf f = channel.alloc().buffer(len); 
     f.writeBytes(b, off, len); 
     addFuture(channel.write(f)); 
    } 
    public abstract void write(int b) throws IOException { 
     checkFuture() 
     Bytebuf f = channel.alloc().buffer(1); 
     f.writeByte(b); 
     addFuture(channel.write(f)); 
    } 

} 
+0

嗨。感謝您的快速回復。 RPC lib使用線程來讀取流。所以我們會在這裏失去網絡優勢。這是否也意味着我們失去netty的優勢,可能只是使用java.io(所以沒有nio)。還是有一些理由使用Netty? –

+0

你會失去Netty的線程部分嗎,你仍然可以使用'EmbeddedChannel'來快速單元測試實現,用'SSLContext'封裝它來實現對協議的SSL支持,並且包含一個'AbstractRemoteAddressFilter'來支持一個簡單的防火牆,甚至創建自定義處理程序,以便它可以通過HTTP協議工作,但使用java套接字將提供更高的性能,因爲數據在使用管道流時不需要傳遞線程邊界。明智地選擇您的未來需求。 – Ferrybig