2013-11-26 67 views
0
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { 

    // Check for closing frame 
    if (frame instanceof CloseWebSocketFrame) { 
     handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); 
     return; 
    } 
    if (frame instanceof PingWebSocketFrame) { 
     ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); 
     return; 
    } 
    if (!(frame instanceof TextWebSocketFrame)) { 
     throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass() 
       .getName())); 
    } 

    // Send the uppercase string back. 
    String request = ((TextWebSocketFrame) frame).text(); 
    if (logger.isLoggable(Level.FINE)) { 
     logger.fine(String.format("%s received %s", ctx.channel(), request)); 
    } 
    Message msg = new Message(ctx.channel(), request); 
    ReadQueueHandler.getInstance().addMessageToProcess(msg); 
} 




public class ReadQueueHandler implements Runnable { 
private static int POOL_SIZE = 3; 
private static ReadQueueHandler instance; 

private final BlockingQueue<Message> messageQueue; 
private final ExecutorService threadPool; 
private final int threadPoolSize; 
private final boolean isActive; 

private ReadQueueHandler() { 
    this.threadPoolSize = POOL_SIZE; 
    this.threadPool = Executors.newFixedThreadPool(threadPoolSize); 
    this.messageQueue = new LinkedBlockingQueue<Message>(); 
    isActive = true; 
    initThreadPool(); 
} 

private void initThreadPool() { 
    for (int i = 0; i < this.threadPoolSize; i++) { 
     this.threadPool.execute(this); 
    } 
} 

/** 
* Add message to read queue 
* 
* @param message 
*   - adding message 
*/ 
public void addMessageToProcess(Message message) { 
    if (message != null) { 
     this.messageQueue.add(message); 
    } 
} 

@Override 
public void run() { 
    while (isActive) { 
     Message message = null; 
     try { 
      message = this.messageQueue.take(); 
     } catch (InterruptedException e) { 
      System.out.println("Exceptio " + e); 
      /* 
      * TODO Add logging 
      */ 
     } 
     if (message != null) { 
      Channel channel = message.getChannel(); 
      channel.write(new TextWebSocketFrame("Message handled ")); 
     } 

    } 
} 

public static ReadQueueHandler getInstance() { 
    if (instance == null) { 
     instance = new ReadQueueHandler(); 
    } 
    return instance; 
} 

}Netty的Channel.write不工作

如果我執行的,而不是將數據添加到隊列Channel.write(「東西」),那麼所有正常工作和客戶端獲取數據。但是,如果Channel.write(「」)從另一個線程執行,則不會獲得數據。什麼是理由?通道寫入不能從另一個線程執行?

回答

0

對於我來說,在完成寫操作以保證將其刷新到套接字後,您似乎忘記調用flush()。例如,你可以通過使用來解決這個問題:

channel.writeAndFlush(new TextWebSocketFrame("Message handled ")); 
相關問題