我正在寫一個簡單的路由應用程序。這個想法是,我有服務器或源節點接收持續x時間的瞬態客戶端連接。接收到的消息被解碼,然後根據消息的細節發送到相應的接收節點或已經打開的客戶端。路由器類註冊所有通道並嘗試將它們保存在地圖中,以便它可以過濾並排除消息的目的地。一旦我到達目的地,我應該能夠選擇實際的匯聚節點(根據配置可以是持久性的瞬態),並將數據發送到該通道等待響應,然後將其發送回始發者。我想知道,如果我的實施使用netty是在正確的方向?以及如何傳遞從任何服務器收到的消息並將其發送給任何客戶端並回應到始發源節點?如何在netty中的通道之間傳遞數據?
下面是我的源代碼:它會/應該給你一個我最喜歡的概念:請在你的解釋中使用代碼示例。
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/*
* @author Kimathi
*/
public class Service {
private Nodes nodes;
public void start(){
nodes = new Nodes();
nodes.addSourceNodes(new SourceNodes()).
addSinkNodes(new SinkNodes()).
addConfigurations(new Configurations()).
boot();
}
public void stop(){
nodes.stop();
}
public static void main(String [] args){
new Service().start();
}
}
class Nodes {
private SourceNodes sourcenodes;
private SinkNodes sinknodes ;
private Configurations configurations;
public Nodes addConfigurations(Configurations configurations){
this.configurations = configurations;
return this;
}
public Nodes addSourceNodes(SourceNodes sourcenodes){
this.sourcenodes = sourcenodes;
return this;
}
public Nodes addSinkNodes(SinkNodes sinknodes){
this.sinknodes = sinknodes;
return this;
}
public void boot(){
Router router = new Router(configurations);
sourcenodes.addPort(8000).
addPort(8001).
addPort(8002);
sourcenodes.addRouter(router);
sourcenodes.boot() ;
sinknodes.addRemoteAddress("127.0.0.1", 6000).
addRemoteAddress("127.0.0.1", 6001).
addRemoteAddress("127.0.0.1", 6002);
sinknodes.addRouter(router);
sinknodes.boot();
}
public void stop(){
sourcenodes.stop();
sinknodes.stop();
}
}
final class SourceNodes implements Bootable , Routable {
private List <Integer> ports = new ArrayList();
private ServerBootstrap serverbootstrap;
private Router router;
@Override
public void addRouter(final Router router){
this.router = router;
}
public SourceNodes addPort(int port){
this.ports.add(port);
return this;
}
@Override
public void boot(){
this.initBootStrap();
this.serverbootstrap.setOption("child.tcpNoDelay", true);
this.serverbootstrap.setOption("child.keepAlive", true);
this.serverbootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new SourceHandler(router));
}
});
for(int port:this.ports){
this.serverbootstrap.bind(new InetSocketAddress(port));
}
}
@Override
public void stop(){
this.serverbootstrap.releaseExternalResources();
}
private void initBootStrap(){
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
this.serverbootstrap = new ServerBootstrap(factory);
}
}
final class SinkNodes implements Bootable , Routable {
private List<SinkAddress> addresses= new ArrayList();
private ClientBootstrap clientbootstrap;
private Router router;
@Override
public void addRouter(final Router router){
this.router = router;
}
public SinkNodes addRemoteAddress(String hostAddress,int port){
this.addresses.add(new SinkAddress(hostAddress,port));
return this;
}
@Override
public void boot(){
this.initBootStrap();
this.clientbootstrap.setOption("tcpNoDelay", true);
this.clientbootstrap.setOption("keepAlive", true);
this.clientbootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new SinkHandler(router));
}
});
for(SinkAddress address:this.addresses){
this.clientbootstrap.connect(new InetSocketAddress(address.hostAddress(),address.port()));
}
}
@Override
public void stop(){
this.clientbootstrap.releaseExternalResources();
}
private void initBootStrap(){
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
this.clientbootstrap = new ClientBootstrap(factory);
}
private class SinkAddress {
private final String hostAddress;
private final int port;
public SinkAddress(String hostAddress, int port) {
this.hostAddress = hostAddress;
this.port = port;
}
public String hostAddress() { return this.hostAddress; }
public int port() { return this.port; }
}
}
class SourceHandler extends SimpleChannelHandler {
private Router router;
public SourceHandler(Router router){
this.router = router;
}
@Override
public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {
System.out.println("child is opened");
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("child is closed");
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("Server is opened");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println(e.getCause());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("channel received message");
}
}
class SinkHandler extends SimpleChannelHandler {
private Router router;
public SinkHandler(Router router){
this.router = router;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("Channel is connected");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println(e.getCause());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("channel received message");
}
}
final class Router {
private Configurations configurations;
private Map sourcenodes = new HashMap();
private Map Sinknodes = new HashMap();
public Router(){}
public Router(Configurations configurations){
this.configurations = configurations;
}
public synchronized boolean submitSource(ChannelHandlerContext ctx , MessageEvent e){
boolean responded = false;
return responded;
}
public synchronized boolean submitSink(ChannelHandlerContext ctx , MessageEvent e){
boolean responded = false;
return responded;
}
}
final class Configurations {
public Configurations(){}
}
interface Bootable {
public abstract void boot();
public abstract void stop();
}
interface Routable {
public abstract void addRouter(Router router);
}