2013-05-08 86 views
0

我正在寫一個簡單的路由應用程序。這個想法是,我有服務器或源節點接收持續x時間的瞬態客戶端連接。接收到的消息被解碼,然後根據消息的細節發送到相應的接收節點或已經打開的客戶端。路由器類註冊所有通道並嘗試將它們保存在地圖中,以便它可以過濾並排除消息的目的地。一旦我到達目的地,我應該能夠選擇實際的匯聚節點(根據配置可以是持久性的瞬態),並將數據發送到該通道等待響應,然後將其發送回始發者。我想知道,如果我的實施使用netty是在正確的方向?以及如何傳遞從任何服務器收到的消息並將其發送給任何客戶端並回應到始發源節點?如何在netty中的通道之間傳遞數據?

下面是我的源代碼:它會/應該給你一個我最喜歡的概念:請在你的解釋中使用代碼示例。

import java.net.InetSocketAddress; 
    import java.util.ArrayList; 
    import java.util.HashMap; 
    import java.util.List; 
    import java.util.Map; 
    import java.util.concurrent.Executors; 
    import org.jboss.netty.bootstrap.ClientBootstrap; 
    import org.jboss.netty.bootstrap.ServerBootstrap; 
    import org.jboss.netty.channel.ChannelFactory; 
    import org.jboss.netty.channel.ChannelHandlerContext; 
    import org.jboss.netty.channel.ChannelPipeline; 
    import org.jboss.netty.channel.ChannelPipelineFactory; 
    import org.jboss.netty.channel.ChannelStateEvent; 
    import org.jboss.netty.channel.Channels; 
    import org.jboss.netty.channel.ChildChannelStateEvent; 
    import org.jboss.netty.channel.ExceptionEvent; 
    import org.jboss.netty.channel.MessageEvent; 
    import org.jboss.netty.channel.SimpleChannelHandler; 
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 

    /* 
    * @author Kimathi 
    */ 

    public class Service { 

     private Nodes nodes; 

     public void start(){ 

      nodes = new Nodes(); 
      nodes.addSourceNodes(new SourceNodes()). 
        addSinkNodes(new SinkNodes()). 
        addConfigurations(new Configurations()). 
        boot(); 
     } 

     public void stop(){ 

      nodes.stop(); 
     } 

     public static void main(String [] args){ 

      new Service().start(); 
     } 

    } 

    class Nodes { 

     private SourceNodes sourcenodes; 

     private SinkNodes sinknodes ; 

     private Configurations configurations; 

     public Nodes addConfigurations(Configurations configurations){ 

      this.configurations = configurations; 

      return this; 
     } 

     public Nodes addSourceNodes(SourceNodes sourcenodes){ 

      this.sourcenodes = sourcenodes; 

      return this; 
     } 

     public Nodes addSinkNodes(SinkNodes sinknodes){ 

      this.sinknodes = sinknodes; 

      return this; 
     } 

     public void boot(){ 

      Router router = new Router(configurations); 

      sourcenodes.addPort(8000). 
         addPort(8001). 
         addPort(8002); 
      sourcenodes.addRouter(router); 
      sourcenodes.boot() ; 

      sinknodes.addRemoteAddress("127.0.0.1", 6000). 
        addRemoteAddress("127.0.0.1", 6001). 
        addRemoteAddress("127.0.0.1", 6002); 
      sinknodes.addRouter(router); 
      sinknodes.boot(); 

     } 

     public void stop(){ 

      sourcenodes.stop(); 

      sinknodes.stop(); 
     } 

    } 

    final class SourceNodes implements Bootable , Routable { 

     private List <Integer> ports = new ArrayList(); 

     private ServerBootstrap serverbootstrap; 

     private Router router; 

     @Override 
     public void addRouter(final Router router){ 

      this.router = router; 
     } 

     public SourceNodes addPort(int port){ 

      this.ports.add(port); 

      return this; 
     } 

     @Override 
     public void boot(){ 

      this.initBootStrap(); 

      this.serverbootstrap.setOption("child.tcpNoDelay", true); 
      this.serverbootstrap.setOption("child.keepAlive", true); 
      this.serverbootstrap.setPipelineFactory(new ChannelPipelineFactory() { 

       @Override 
       public ChannelPipeline getPipeline() throws Exception { 

        return Channels.pipeline(new SourceHandler(router)); 
       } 
      }); 



      for(int port:this.ports){ 
       this.serverbootstrap.bind(new InetSocketAddress(port)); 
      } 
     } 

     @Override 
     public void stop(){ 

      this.serverbootstrap.releaseExternalResources(); 

     } 

     private void initBootStrap(){ 

      ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); 

      this.serverbootstrap = new ServerBootstrap(factory); 
     } 
    } 

    final class SinkNodes implements Bootable , Routable { 

     private List<SinkAddress> addresses= new ArrayList(); 

     private ClientBootstrap clientbootstrap; 

     private Router router; 

     @Override 
     public void addRouter(final Router router){ 

      this.router = router; 

     } 

     public SinkNodes addRemoteAddress(String hostAddress,int port){ 

      this.addresses.add(new SinkAddress(hostAddress,port)); 

      return this; 
     } 

     @Override 
     public void boot(){ 

      this.initBootStrap(); 

      this.clientbootstrap.setOption("tcpNoDelay", true); 
      this.clientbootstrap.setOption("keepAlive", true); 
      this.clientbootstrap.setPipelineFactory(new ChannelPipelineFactory() { 

       @Override 
       public ChannelPipeline getPipeline() throws Exception { 

        return Channels.pipeline(new SinkHandler(router)); 
       } 
      }); 

      for(SinkAddress address:this.addresses){ 

       this.clientbootstrap.connect(new InetSocketAddress(address.hostAddress(),address.port())); 
      } 
     } 

     @Override 
     public void stop(){ 

      this.clientbootstrap.releaseExternalResources(); 
     } 

     private void initBootStrap(){ 

      ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); 

      this.clientbootstrap = new ClientBootstrap(factory); 
     } 

     private class SinkAddress { 

      private final String hostAddress; 
      private final int port; 

      public SinkAddress(String hostAddress, int port) { 
       this.hostAddress = hostAddress; 
       this.port = port; 
      } 

      public String hostAddress() { return this.hostAddress; } 
      public int port() { return this.port; } 
     } 
    } 

    class SourceHandler extends SimpleChannelHandler { 

     private Router router; 

     public SourceHandler(Router router){ 

      this.router = router; 
     } 

     @Override 
     public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { 

      System.out.println("child is opened"); 
     } 

     @Override 
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 

      System.out.println("child is closed"); 
     } 

     @Override 
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 


       System.out.println("Server is opened"); 

     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 

      System.out.println(e.getCause()); 
     } 

     @Override 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 


      System.out.println("channel received message"); 

     } 
    } 

    class SinkHandler extends SimpleChannelHandler { 

     private Router router; 

     public SinkHandler(Router router){ 

      this.router = router; 
     } 

     @Override 
     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 

      System.out.println("Channel is connected"); 
     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 

      System.out.println(e.getCause()); 
     } 

     @Override 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 

      System.out.println("channel received message"); 

     } 
    } 

    final class Router { 

     private Configurations configurations; 

     private Map sourcenodes = new HashMap(); 

     private Map Sinknodes = new HashMap(); 

     public Router(){} 

     public Router(Configurations configurations){ 

      this.configurations = configurations; 
     } 

     public synchronized boolean submitSource(ChannelHandlerContext ctx , MessageEvent e){ 

      boolean responded = false; 

      return responded; 
     } 

     public synchronized boolean submitSink(ChannelHandlerContext ctx , MessageEvent e){ 

      boolean responded = false; 

      return responded; 
     } 
    } 

    final class Configurations { 

     public Configurations(){} 
    } 

    interface Bootable { 

     public abstract void boot(); 

     public abstract void stop(); 
    } 

    interface Routable { 

     public abstract void addRouter(Router router); 
    } 

回答

0

這個想法似乎是合理的。

源通道處理程序只能使用Channel#write(...)寫入相應的接收通道,反之亦然。

當然,您還需要一種將源通道與回覆關聯的方式,以及如何最好地完成取決於協議的性質。如果可能的話,最好的選擇是以某種方式將消息中的源通道ID編碼到宿信道(當然也包括在回覆中)。

如果這是不可能的,你將不知何故必須保持相關性。如果確保答覆與發送的請求配對,則每個接收器通道的FIFO隊列可能是合適的。

相關問題