的InputStream
其他API有readpacket()方法
如果庫有readPacket方法,你可以結合一個ReplayingDecoder
使用ByteBufInputStream
,這是比較容易實現:
public class RPCInputHandler extends ReplayingDecoder<Object> {
RPC upstream = ....;
protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
upstream.readPacket(new ByteBufInputStream(buf));
state(null);
}
}
遠程API使用線程來讀取流
如果您的上游庫使用單獨的線程處理傳入的消息,您將失去Netty的主要優點之一:對於大量連接而言,線程數較少。
public class RPCInputHandler extends SimpleChannelInboundHandler<ByteBuf> {
RPC upstream = ....;
PipedInputStream in;
PipedOutputStream out;
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
in = new PipedInputStream();
out = new PipedOutputStream(in);
upstream.startInput(in);
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
out.close(); // This sends EOF to the other pipe
}
// This method is called messageReceived(ChannelHandlerContext, I) in 5.0.
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] data = new byte[msg.readableBytes()];
msg.readBytes(data);
out.write(data);
}
}
的OutputStream
製作寫入字節到我們的連接定製的OutputStream簡單,最直接的方法映射在
public class OutputStreamNetty extends OutputStream {
final Channel channel = ...;
ChannelFuture lastFuture;
private void checkFuture() throws IOException {
if(lastFuture.isDone()) {
if(lastFuture.cause() != null) {
throw new IOException("Downstream write problem", lastFuture.cause());
}
lastFuture = null;
}
}
private void addFuture(ChannelFuture f) {
if(lastFuture == null) {
lastFuture = f;
}
}
public void close() throws IOException {
checkFuture()
addFuture(channel.close());
}
public void flush() throws IOException {
checkFuture()
addFuture(channel.flush());
}
public void write(byte[] b, int off, int len) throws IOException {
checkFuture()
Bytebuf f = channel.alloc().buffer(len);
f.writeBytes(b, off, len);
addFuture(channel.write(f));
}
public abstract void write(int b) throws IOException {
checkFuture()
Bytebuf f = channel.alloc().buffer(1);
f.writeByte(b);
addFuture(channel.write(f));
}
}
嗨。感謝您的快速回復。 RPC lib使用線程來讀取流。所以我們會在這裏失去網絡優勢。這是否也意味着我們失去netty的優勢,可能只是使用java.io(所以沒有nio)。還是有一些理由使用Netty? –
你會失去Netty的線程部分嗎,你仍然可以使用'EmbeddedChannel'來快速單元測試實現,用'SSLContext'封裝它來實現對協議的SSL支持,並且包含一個'AbstractRemoteAddressFilter'來支持一個簡單的防火牆,甚至創建自定義處理程序,以便它可以通過HTTP協議工作,但使用java套接字將提供更高的性能,因爲數據在使用管道流時不需要傳遞線程邊界。明智地選擇您的未來需求。 – Ferrybig