2013-07-09 230 views
1

我正在爲一個Netty客戶端/服務器傳輸字符串原型,現在我想在傳遞到服務器端時將這些字符串傳遞給文件。如何操縱來自Netty服務器/客戶端的消息

客戶:使用

private final String id; 
    private ServerBootstrap bootstrap; 
    private ChannelGroup channelGroup; 
    private MyHandler handler= new MyHandler(); 


    public Server(String id) { 
     this.id = id; 
    } 

    // public methods --------------------------------------------------------- 

    public boolean start() { 
     // Pretty standard Netty startup stuff... 
     // boss/worker executors, channel factory, channel group, pipeline, ... 
     Executor bossPool = Executors.newCachedThreadPool(); 
     Executor workerPool = Executors.newCachedThreadPool(); 
     ChannelFactory factory = 
       new NioServerSocketChannelFactory(bossPool, workerPool); 
     this.bootstrap = new ServerBootstrap(factory); 

     this.channelGroup = new DefaultChannelGroup(this.id + "-all-channels"); 


     // declared here to fit under the 80 char limit 
     final ChannelHandler delimiter = 
       new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, 
         Delimiters.lineDelimiter()); 
     this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 

      @Override 
      public ChannelPipeline getPipeline() throws Exception { 
       SimpleChannelHandler handshakeHandler = 
         new SimpleChannelHandler(); 
       return Channels.pipeline(
         handler, 
         delimiter, 
         new StringDecoder(), 
         new StringEncoder(), 
         handshakeHandler); 
      } 
     }); 

     Channel acceptor = this.bootstrap.bind(new InetSocketAddress(12345)); 
     if (acceptor.isBound()) { 
      System.out.println("+++ SERVER - bound to *:12345"); 
      this.channelGroup.add(acceptor); 
      return true; 
     } else { 
      System.err.println("+++ SERVER - Failed to bind to *:12345"); 
      this.bootstrap.releaseExternalResources(); 
      return false; 
     } 
    } 

    public void stop() { 
     this.channelGroup.close().awaitUninterruptibly(); 
     this.bootstrap.releaseExternalResources(); 
     System.err.println("+++ SERVER - Stopped."); 
    } 

處理程序:

private ClientBootstrap bootstrap; 
    private Channel connector; 
    private MyHandler handler=new MyHandler(); 

    public boolean start() { 
     // Standard netty bootstrapping stuff. 
     Executor bossPool = Executors.newCachedThreadPool(); 
     Executor workerPool = Executors.newCachedThreadPool(); 
     ChannelFactory factory = 
       new NioClientSocketChannelFactory(bossPool, workerPool); 
     this.bootstrap = new ClientBootstrap(factory); 

     // Declared outside to fit under 80 char limit 
     final DelimiterBasedFrameDecoder frameDecoder = 
       new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, 
         Delimiters.lineDelimiter()); 
     this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
      public ChannelPipeline getPipeline() throws Exception { 
       return Channels.pipeline(
         handler, 
         frameDecoder, 
         new StringDecoder(), 
         new StringEncoder()); 
      } 
     }); 

     ChannelFuture future = this.bootstrap 
       .connect(new InetSocketAddress("localhost", 12345)); 
     if (!future.awaitUninterruptibly().isSuccess()) { 
      System.out.println("--- CLIENT - Failed to connect to server at " + 
        "localhost:12345."); 
      this.bootstrap.releaseExternalResources(); 
      return false; 
     } 

     this.connector = future.getChannel(); 
     return this.connector.isConnected(); 
    } 

    public void stop() { 
     if (this.connector != null) { 
      this.connector.close().awaitUninterruptibly(); 
     } 
     this.bootstrap.releaseExternalResources(); 
     System.out.println("--- CLIENT - Stopped."); 
    } 

    public boolean sendMessage(String message) { 
     if (this.connector.isConnected()) { 
      // Append \n if it's not present, because of the frame delimiter 
      if (!message.endsWith("\n")) { 
       this.connector.write(message + '\n'); 
      } else { 
       this.connector.write(message); 
      } 
      System.out.print(message); 
      return true; 
     } 

     return false; 
    } 

服務器 客戶端處理程序:

public class MyHandler extends SimpleChannelUpstreamHandler{ 
    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 
      throws Exception { 
     if(e.getMessage() instanceof String){ 
      System.out.println((String)e.getMessage()); 
     } 
     System.out.println(e.getMessage().toString()); 
    } 
} 

服務器處理器:

@Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 
      throws Exception { 
     Channel channel= ctx.getChannel(); 
     channel.write(e.getMessage()); 
     if(e.getMessage() instanceof String){ 
      System.out.println((String)e.getMessage()); 
     } 
     System.out.println(e.getMessage().toString()); 
    } 

客戶亞軍:

public static void main(String[] args) throws InterruptedException { 

     final int nMessages = 5; 

     try { 
      Client c = new Client(); 

      if (!c.start()) { 
       return; 
      } 

      for (int i = 0; i < nMessages; i++) { 

       Thread.sleep(1L); 
       c.sendMessage((i + 1) + "\n"); 
      } 
      c.stop(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

服務器亞軍:

public static void main(String[] args) { 
     final Server s = new Server("server1"); 

     if (!s.start()) { 
      return; 
     } 

     Runtime.getRuntime().addShutdownHook(new Thread() { 
      @Override 
      public void run() { 
       s.stop(); 
      } 
     }); 
    } 

現在我真正需要的是打印,我在客戶端和服務器端的通道上寫的消息,我感到很困惑在這。

回答

1

您的管線創建看起來是錯誤的,在服務器端解碼時,Delimiter需要先來,然後StringDecoder,然後是業務處理程序。您可以通過在這些解碼器和編碼器中添加斷點來解決這個問題。也請看這link關於如何工作的非常好的文檔。

相關問題