2012-02-01 22 views
9

我正在使用Netty 3.2.7。我試圖在我的客戶端編寫功能,以便如果在一段時間(例如30秒)之後沒有寫入消息,則將「保持活動」消息發送到服務器。使用WriteTimeoutHandler在Netty中實現保持活動消息

經過一番挖掘,我發現WriteTimeoutHandler應該使我能夠做到這一點。我在這裏找到這個解釋:https://issues.jboss.org/browse/NETTY-79

的Netty的文檔中給出的例子是:

public ChannelPipeline getPipeline() { 
    // An example configuration that implements 30-second write timeout: 
    return Channels.pipeline(
     new WriteTimeoutHandler(timer, 30), // timer must be shared. 
     new MyHandler()); 
} 

在我的測試客戶端,我剛纔這件事。在MyHandler的,我也overrided的exceptionCaught()方法:

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { 
    if (e.getCause() instanceof WriteTimeoutException) { 
     log.info("Client sending keep alive!"); 
     ChannelBuffer keepAlive = ChannelBuffers.buffer(KEEP_ALIVE_MSG_STR.length()); 
     keepAlive.writeBytes(KEEP_ALIVE_MSG_STR.getBytes()); 
     Channels.write(ctx, Channels.future(e.getChannel()), keepAlive); 
    } 
} 

不管是什麼時間,客戶端不寫什麼渠道,我已經覆蓋了exceptionCaught()方法不會被調用。

望着WriteTimeoutHandler源,其writeRequested()的實現是:

public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) 
     throws Exception { 

    long timeoutMillis = getTimeoutMillis(e); 
    if (timeoutMillis > 0) { 
     // Set timeout only when getTimeoutMillis() returns a positive value. 
     ChannelFuture future = e.getFuture(); 
     final Timeout timeout = timer.newTimeout(
       new WriteTimeoutTask(ctx, future), 
       timeoutMillis, TimeUnit.MILLISECONDS); 

     future.addListener(new TimeoutCanceller(timeout)); 
    } 

    super.writeRequested(ctx, e); 
} 

這裏,似乎這個實施說道,「當被要求寫,做一個新的超時當寫入成功。 ,取消超時。「

使用調試器,它似乎確實發生了這種情況。寫入完成後,超時被取消。這不是我想要的行爲。我想要的行爲是:「如果客戶端在30秒內沒有將任何信息寫入通道,請拋出WriteTimeoutException。」

那麼,這不是WriteTimeoutHandler的用途嗎?這是我從網上閱讀的內容中解讀它的過程,但實現似乎並沒有這樣工作。我用錯了嗎?我應該使用別的東西嗎?在我試圖重寫的同一個客戶端的Mina版本中,我發現sessionIdle()方法被覆蓋以實現我想要的行爲,但此方法在Netty中不可用。

回答

5

我會建議添加IdleStateHandler,然後添加您的自定義實現IdleStateAwareUpstreamHandler它可以對空閒狀態作出反應。這對我來說在很多不同的項目上都非常有效。

的javadoc列出了下面的例子,你可以爲你實現的基本使用方法:

public class MyPipelineFactory implements ChannelPipelineFactory { 

    private final Timer timer; 
    private final ChannelHandler idleStateHandler; 

    public MyPipelineFactory(Timer timer) { 
     this.timer = timer; 
     this.idleStateHandler = new IdleStateHandler(timer, 60, 30, 0); 
     // timer must be shared. 
    } 

    public ChannelPipeline getPipeline() { 
     return Channels.pipeline(
      idleStateHandler, 
      new MyHandler()); 
    } 
} 

// Handler should handle the IdleStateEvent triggered by IdleStateHandler. 
public class MyHandler extends IdleStateAwareChannelHandler { 

    @Override 
    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { 
     if (e.getState() == IdleState.READER_IDLE) { 
      e.getChannel().close(); 
     } else if (e.getState() == IdleState.WRITER_IDLE) { 
      e.getChannel().write(new PingMessage()); 
     } 
    } 
} 

ServerBootstrap bootstrap = ...; 
Timer timer = new HashedWheelTimer(); 
... 
bootstrap.setPipelineFactory(new MyPipelineFactory(timer)); 
... 
+0

我能夠在十分鐘內實施您所建議的更改,並且完美地工作。謝謝你,先生! – ImmuneEntity 2012-02-01 21:28:05

+1

Google文檔已移至[IdleStateHandler.html](http://static.netty.io/3.6/api/org/jboss/netty/handler/timeout/IdleStateHandler.html),[IdleStateAwareChannelHandler.html](http:// static.netty.io/3.6/api/org/jboss/netty/handler/timeout/IdleStateAwareChannelHandler.html) – mxro 2012-12-25 01:01:47

8

對於的Netty 4.0及更高版本,你應該像例如延長ChannelDuplexHandlerIdleStateHandler documentation

// An example that sends a ping message when there is no outbound traffic 
// for 30 seconds. The connection is closed when there is no inbound traffic 
// for 60 seconds. 

public class MyChannelInitializer extends ChannelInitializer<Channel> { 
    @Override 
    public void initChannel(Channel channel) { 
     channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0)); 
     channel.pipeline().addLast("myHandler", new MyHandler()); 
    } 
} 

// Handler should handle the IdleStateEvent triggered by IdleStateHandler. 
public class MyHandler extends ChannelDuplexHandler { 
    @Override 
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 
     if (evt instanceof IdleStateEvent) { 
      IdleStateEvent e = (IdleStateEvent) evt; 
      if (e.state() == IdleState.READER_IDLE) { 
       ctx.close(); 
      } else if (e.state() == IdleState.WRITER_IDLE) { 
       ctx.writeAndFlush(new PingMessage()); 
      } 
     } 
    } 
}