2011-07-22 47 views
0

使用JBOSS Netty,我試圖連續發送數據到連接的客戶端。在下面的例子中, 一旦客戶端連接(channelConnected),我會嘗試每5秒發送一次到客戶端。Jboss Netty - 無法連續發送數據?

但這不起作用。它只有在我評論while循環時纔有效。

import java.net.InetAddress; 
    import java.net.InetSocketAddress; 
    import java.util.Date; 
    import java.util.concurrent.Executors; 
    import java.util.logging.Level; 
    import java.util.logging.Logger; 

    import org.jboss.netty.bootstrap.ServerBootstrap; 
    import org.jboss.netty.channel.ChannelHandlerContext; 
    import org.jboss.netty.channel.ChannelPipeline; 
    import org.jboss.netty.channel.ChannelPipelineFactory; 
    import org.jboss.netty.channel.ChannelStateEvent; 
    import org.jboss.netty.channel.Channels; 
    import org.jboss.netty.channel.ExceptionEvent; 
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 
    import org.jboss.netty.handler.codec.string.StringEncoder; 

    public class SRNGServer { 

     public static void main(String[] args) throws Exception { 
      // Configure the server. 
      ServerBootstrap bootstrap = new ServerBootstrap(
        new NioServerSocketChannelFactory(
          Executors.newCachedThreadPool(), 
          Executors.newCachedThreadPool())); 

      // Configure the pipeline factory. 
      bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP()); 

      // Bind and start to accept incoming connections. 
      bootstrap.bind(new InetSocketAddress(8080)); 
     } 



     private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler { 

     private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName()); 


     @Override 
     public void channelConnected(
       ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
      // Send greeting for a new connection. 
      e.getChannel().write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n"); 

      while(true){ 
      e.getChannel().write("It is " + new Date() + " now.\r\n"); 

      Thread.sleep(1000*5); 
      } 
     } 

     @Override 
     public void exceptionCaught(
       ChannelHandlerContext ctx, ExceptionEvent e) { 
      logger.log(
        Level.WARNING, 
        "Unexpected exception from downstream.", 
        e.getCause()); 
      e.getChannel().close(); 
     } 
     } 



     private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory { 

     public ChannelPipeline getPipeline() throws Exception { 

      // Create a default pipeline implementation. 
      ChannelPipeline pipeline = Channels.pipeline(); 

      pipeline.addLast("encoder", new StringEncoder()); 
      pipeline.addLast("handler", new SRNGServerHandlerP()); 

      return pipeline; 
     } 
     } 

    } 
+0

不工作,你得到的結果是什麼? – prusswan

+0

@Prusswan:在輸出中看不到任何東西......只有當channelConnected方法「返回」並且堆棧中的任何東西被執行時才顯示輸出,因爲這是事件驅動的。 – FatherFigure

回答

0

看來,I/O線程被阻塞睡眠的結果,所以儘量使用2個工作線程,而不是:

ServerBootstrap bootstrap = new ServerBootstrap(
    new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), 
     Executors.newCachedThreadPool(), 2)); 
+0

:這就是工作線程的最大數量......我其實有4個。 – FatherFigure

3

的Netty的文件實際上規定,你不應該做處理程序因爲它最終可能會陷入僵局。原因是處理程序方法是由I/O線程直接調用的。 Netty中的一個I/O線程按順序執行多個I/O操作,因此每個操作不是一個線程。 在channelConnected方法中,您應該通過引用通道來啓動一個新線程,並使該線程每5秒發送一次。這會爲每個連接產生一個線程。 或者,您可以讓一個線程每隔5秒在客戶端列表上循環並按時間順序將時間發送給每個客戶端。 無論如何,重要的是使用不同的線程來發送,而不是調用Handler的線程。

+0

感謝您的信息。請看看我發佈的soln ... – FatherFigure

+0

我認爲它應該工作,這是爲什麼:當調用write方法時,您將寫入事件添加到Netty的I/O事件隊列中。但它不會立即啓動,因爲您仍處於channelConnected方法中。一旦你離開它,寫入事件將觸發,當它完成時,它會調用你定義的監聽器,但是會在一個新的線程中。這是新的線程將睡5秒,而不是Netty的I/O線程。 – fsaftoiu

1

對於它的價值,我想出瞭解決方案,這裏是工作代碼。在「寫入」時間之後,我用ChannelFuturelistener註冊了未來。然後從操作完成我不斷註冊每個寫作的新未來。這適用於我想要完成的任務,無需使用任何額外的線程。

import java.net.InetSocketAddress; 
    import java.nio.channels.ClosedChannelException; 
    import java.util.Date; 
    import java.util.concurrent.Executors; 
    import java.util.logging.Level; 
    import java.util.logging.Logger; 

    import org.jboss.netty.bootstrap.ServerBootstrap; 
    import org.jboss.netty.channel.Channel; 
    import org.jboss.netty.channel.ChannelFuture; 
    import org.jboss.netty.channel.ChannelFutureListener; 
    import org.jboss.netty.channel.ChannelHandlerContext; 
    import org.jboss.netty.channel.ChannelPipeline; 
    import org.jboss.netty.channel.ChannelPipelineFactory; 
    import org.jboss.netty.channel.ChannelStateEvent; 
    import org.jboss.netty.channel.Channels; 
    import org.jboss.netty.channel.ExceptionEvent; 
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 
    import org.jboss.netty.handler.codec.string.StringEncoder; 

    public class SRNGServer { 

     public static void main(String[] args) throws Exception { 
      // Configure the server. 
      ServerBootstrap bootstrap = new ServerBootstrap(
        new NioServerSocketChannelFactory(
          Executors.newCachedThreadPool(), 
          //Executors.newCachedThreadPool() 
          Executors.newFixedThreadPool(2),2 
         )); 

      // Configure the pipeline factory. 
      bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP()); 

      // Bind and start to accept incoming connections. 
      bootstrap.bind(new InetSocketAddress(8080)); 
     } 



     private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler { 

     private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName()); 


     @Override 
     public void channelConnected(
       ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 

      // Send greeting for a new connection. 
      Channel ch=e.getChannel(); 
      ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n"); 

      SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener(); 

      writeFuture.addListener(srngcfl);  

     } 

     @Override 
     public void exceptionCaught(
       ChannelHandlerContext ctx, ExceptionEvent e) { 

      logger.log(
        Level.WARNING, 
        "Unexpected exception from downstream.", 
        e.getCause()); 
      if(e.getCause() instanceof ClosedChannelException){ 
       logger.log(Level.INFO, "****** Connection closed by client - Closing Channel"); 
      } 
      e.getChannel().close(); 
     } 
     } 



     private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory { 

     public ChannelPipeline getPipeline() throws Exception { 

      // Create a default pipeline implementation. 
      ChannelPipeline pipeline = Channels.pipeline(); 

      pipeline.addLast("encoder", new StringEncoder()); 
      pipeline.addLast("handler", new SRNGServerHandlerP()); 

      return pipeline; 
     } 
     } 


     private static class SRNGChannelFutureListener implements ChannelFutureListener{ 

     public void operationComplete(ChannelFuture future) throws InterruptedException{ 
      Thread.sleep(1000*5); 
      Channel ch=future.getChannel(); 
      if(ch!=null && ch.isConnected()){ 
       ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n"); 
       //-- Add this instance as listener itself. 
       writeFuture.addListener(this); 
      } 

     } 

     } 
    }