2013-11-04 20 views
1

我在我的申請Netty的幀解碼器損壞上的多個消息

的應用程序的協議是簡單前四個字符將是消息長度則消息中的一個實現的NettyDecoder。

幀解碼器邏輯是

import java.nio.ByteBuffer; 
import org.apache.commons.io.IOUtils; 
import org.jboss.netty.buffer.ChannelBuffer; 
import org.jboss.netty.channel.Channel; 
import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.handler.codec.frame.FrameDecoder; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import sun.nio.cs.StandardCharsets; 

public class ITMDecoder extends FrameDecoder { 


public static String bytesToStringUTFCustom(byte[] bytes) { 
    char[] buffer = new char[bytes.length >> 1]; 
    for(int i = 0; i < buffer.length; i++) { 
     int bpos = i << 1; 
     char c = (char)(((bytes[bpos]&0x00FF)<<8) + (bytes[bpos+1]&0x00FF)); 
     buffer[i] = c; 
    } 
    return new String(buffer); 
    } 

protected Object decode(ChannelHandlerContext ctx, Channel channel, 
     ChannelBuffer buf) throws Exception { 

    Logger logger = LoggerFactory.getLogger(ITMDecoder.class); 

    // Make sure if the length field was received. 
    if (buf.readableBytes() < 4) { 
     // The length field was not received yet - return null. 
     // This method will be invoked again when more packets are 
     // received and appended to the buffer. 
     return null; 
    } 


    // The length field is in the buffer. 

    // Mark the current buffer position before reading the length field 
    // because the whole frame might not be in the buffer yet. 
    // We will reset the buffer position to the marked position if 
    // there's not enough bytes in the buffer. 
    buf.markReaderIndex(); 

    // Read the length field. 

    byte[] twoBytesLength = new byte[4]; 

     for(int i = 0 ; i < 4 ; i++) 
     twoBytesLength[i] = buf.getByte(i); 

    String str = new String(twoBytesLength, "UTF-8"); 
    Short shortValue =  Short.parseShort(str); 
    int length = shortValue.intValue() + 4; 
    // Make sure if there's enough bytes in the buffer. 
    if (buf.readableBytes() < length) { 
     // The whole bytes were not received yet - return null. 
     // This method will be invoked again when more packets are 
     // received and appended to the buffer. 

     // Reset to the marked position to read the length field again 
     // next time. 
     buf.resetReaderIndex(); 

     return null; 
    } 

    // There's enough bytes in the buffer. Read it. 

    ChannelBuffer frame = buf.readBytes(length); 

    // Successfully decoded a frame. Return the decoded frame. 
    return frame; 
} 

} 

通道管道的邏輯是:

 ServerBootstrap bootstrap = new ServerBootstrap(
      new NioServerSocketChannelFactory(
     Executors.newCachedThreadPool(), 
     Executors.newCachedThreadPool())); 


    bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
      public ChannelPipeline getPipeline() throws Exception { 
      return Channels.pipeline(
      new ITMDecoder(), 
      new M3AlertHandler() 
      ); 
      }; 
     }); 

它正常工作時的交易量是大約2 TPS。然而,當事務以更高的tps發送時,幀解碼器會破壞。

我已經檢查與插座工作臺相同用的2個可變長度

我曾經發送到服務器的消息中的一個長消息是消息1 = 00051234500041234

重複相同的1000次,併發送它在一秒之後解碼器在5/6消息之後破壞?

有什麼我失蹤,這將實際上使其正常工作?

回答

3
for(int i = 0 ; i < 4 ; i++) 
    twoBytesLength[i] = buf.getByte(i); 

你不應該假設第一個字節的索引是0。第一個字節的索引是buf.readerIndex()

for (int i = 0; i < 4; i ++) { 
    twoBytesLength[i] = buf.getByte(buf.readerIndex() + i); 
} 

你甚至可以進一步優化字節緩衝區訪問,但這不是問題的範圍。

+0

謝謝我使用readByte(),而不是我猜這是上面的代碼很多。 – azimyasin