2012-07-03 74 views
0

我想使用NIO向/從遠程機器發送/接收數據。我可以在任何時候發送或接收數據,當我需要發送數據時,只需發送數據而不需要遠程機器的任何查詢,而遠程機器則會定期向我發送數據。我不明白NIO機制。什麼在Selector SelectionKey上生成並讀取或寫入事件?是否可以在我身邊只使用一個ServerSocketChannel,從遠程機器讀取數據並將數據寫入它?這是我的理解,但我不知道如何觸發寫作事件......感謝您的解釋。Java遠程機器上的NIO讀寫

我已經做了一些編碼,我可以讀取從遠端機器進來的數據,但不能寫入。我使用選擇器,我不知道如何寫數據。記錄的消息「句柄寫入」從不寫入,但在wireshark中,我可以看到我的數據包。

public class ServerSelector { 

    private static final Logger logger = Logger.getLogger(ServerSelector.class.getName()); 
    private static final int TIMEOUT = 3000; // Wait timeout (milliseconds) 
    private static final int MAXTRIES = 3; 
    private final Selector selector; 

    public ServerSelector(Controller controller, int... servPorts) throws IOException { 
     if (servPorts.length <= 0) { 
      throw new IllegalArgumentException("Parameter(s) : <Port>..."); 
     } 
     Handler consolehHandler = new ConsoleHandler(); 
     consolehHandler.setLevel(Level.INFO); 
     logger.addHandler(consolehHandler); 

     // Create a selector to multiplex listening sockets and connections 
     selector = Selector.open(); 

     // Create listening socket channel for each port and register selector 
     for (int servPort : servPorts) { 
      ServerSocketChannel listnChannel = ServerSocketChannel.open(); 
      listnChannel.socket().bind(new InetSocketAddress(servPort)); 

      listnChannel.configureBlocking(false); // must be nonblocking to register 
      // Register selector with channel. The returned key is ignored 
      listnChannel.register(selector, SelectionKey.OP_ACCEPT); 
     } 

     // Create a handler that will implement the protocol 
     IOProtocol protocol = new IOProtocol(); 

     int tries = 0; 
     // Run forever, processing available I/O operations 
     while (tries < MAXTRIES) { 
      // Wait for some channel to be ready (or timeout) 
      if (selector.select(TIMEOUT) == 0) { // returns # of ready chans 
       System.out.println("."); 
       tries += 1; 
       continue; 
      } 

      // Get iterator on set of keys with I/O to process 
      Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); 
      while (keyIter.hasNext()) { 
       SelectionKey key = keyIter.next(); // Key is a bit mask 
       // Server socket channel has pending connection requests? 
       if (key.isAcceptable()) { 
        logger.log(Level.INFO, "handle accept"); 
        protocol.handleAccept(key, controller); 
       } 

       // Client socket channel has pending data? 
       if (key.isReadable()) { 
        logger.log(Level.INFO, "handle read"); 
        protocol.handleRead(key); 
       } 

       // Client socket channel is available for writing and 
       // key is valid (i.e., channel not closed) ? 
       if (key.isValid() && key.isWritable()) { 
        logger.log(Level.INFO, "handle write"); 
        protocol.handleWrite(key); 
       } 
       keyIter.remove(); // remove from set of selected keys 
       tries = 0; 
      } 
     } 
    } 
} 

協議

public class IOProtocol implements Protocol { 

    private static final Logger logger = Logger.getLogger(IOProtocol.class.getName()); 

    IOProtocol() { 
     Handler consolehHandler = new ConsoleHandler(); 
     consolehHandler.setLevel(Level.INFO); 
     logger.addHandler(consolehHandler); 
    } 

    /** 
    * 
    * @param key 
    * @throws IOException 
    */ 
    @Override 
    public void handleAccept(SelectionKey key, Controller controller) throws IOException { 
     SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); 
     clntChan.configureBlocking(false); // Must be nonblocking to register 
     controller.setCommChannel(clntChan); 
     // Register the selector with new channel for read and attach byte buffer 
     SelectionKey socketKey = clntChan.register(key.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE, controller); 
    } 

    /** 
    * Client socket channel has pending data 
    * 
    * @param key 
    * @throws IOException 
    */ 
    @Override 
    public void handleRead(SelectionKey key) throws IOException { 
     Controller ctrller = (Controller)key.attachment(); 
     try { 
      ctrller.readData(); 
     } catch (CommandUnknownException ex) { 
      logger.log(Level.SEVERE, null, ex); 
     } 
     key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); 
    } 

    /** 
    * Channel is available for writing, and key is valid (i.e., client channel 
    * not closed). 
    * 
    * @param key 
    * @throws IOException 
    */ 
    @Override 
    public void handleWrite(SelectionKey key) throws IOException { 

     Controller ctrl = (Controller)key.attachment(); 
     ctrl.writePendingData(); 
     if (!buf.hasRemaining()) { // Buffer completely written ? 
      // Nothing left, so no longer interested in writes 
      key.interestOps(SelectionKey.OP_READ); 
     } 
    buf.compact(); 
    } 
} 

控制器

/** 
    * Fill buffer with data. 
    * @param msg The data to be sent 
    * @throws IOException 
    */ 
    private void writeData(AbstractMsg msg) throws IOException { 
//   
     writeBuffer = ByteBuffer.allocate(msg.getSize() + 4); 
     writeBuffer.putInt(msg.getSize()); 
     msg.writeHeader(writeBuffer); 
     msg.writeData(writeBuffer); 
     logger.log(Level.INFO, "Write data - message size : {0}", new Object[]{msg.getSize()}); 
     logger.log(Level.INFO, "Write data - message : {0}", new Object[]{msg}); 
    } 

    /** 
    * Write to the SocketChannel 
    * @throws IOException 
    */ 
    public void writePendingData() throws IOException { 
     commChannel.write(writeBuffer); 
    } 

回答

0

ServerSocketChannel用於建立連接,但不發送數據。每個連接需要一個ServerSocketChannel和一個SocketChannel。使用SocketChannel閱讀和寫作的

例子:

ByteBuffer buf = ByteBuffer.allocate(48); 
int bytesRead = socketChannel.read(buf); 

你的程序將睡覺的第二行,直到數據會來。您需要將此代碼放入無限循環,並在後臺運行Thread。當數據到達時,您可以從此線程處理數據,然後等待另一個數據到來。

ByteBuffer buf = ByteBuffer.allocate(48); 
buf.clear(); 
buf.put("Hello!".getBytes()); 

buf.flip(); 

while(buf.hasRemaining()) { 
    channel.write(buf); 
} 

沒有阻塞的方法,因此,如果您發送的小字節的緩衝區,你可以從你的主Thread調用它。

Source

地址: 不要設置新的連接OP_WRITE關鍵。只有OP_READ。當你想寫一些數據時,你需要通知選擇器你想發送一些東西並且在事件循環中發送它。好的解決方法是製作輸出消息的Queue。然後,按照此步驟:

  • 將數據添加到Queue
  • 設置OP_WRITE到通道的鍵
  • while (keyIter.hasNext())循環你必須writable key,從隊列中寫的所有數據,並刪除OP_WRITE關鍵。

我很難理解你的代碼,但我想你會發現問題所在。另外如果你想只有一個連接,則不需要使用Selector。這很奇怪,你很少綁定ServerSocketChannels

+0

我貼了一些代碼 – jerome

+0

編輯我的回答 – alaster

+0

這些方法都是阻塞的,除非你把通道設置爲非阻塞模式。沒有必要「通知選擇器你想發送什麼」。傳出消息的隊列沒有必要。太不準確。 -1。 – EJP

0

我會建議你使用阻塞NIO(這是一個SocketChannel的默認行爲)你不需要使用選擇但你可以使用一個線程閱讀,另一個線程寫作。


根據你的例子。

private final ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024*1024); 

private void writeData(AbstractMsg msg) { 
    writeBuffer.clear(); 
    writeBuffer.putInt(0); // set later 
    msg.writeHeader(writeBuffer); 
    msg.writeData(writeBuffer); 
    writeBuffer.putInt(0, writeBuffer.position()); 

    writeBuffer.flip(); 
    while(writeBuffer.hasRemaining()) 
     commChannel.write(writeBuffer); 
} 
+0

我發佈了一些代碼,我讀取了數據,但不知道如何將數據寫入遠端機器。 – jerome

+0

我已經根據您的示例添加了一些代碼。順便說一句:這臺機器有多遠並不重要。無論服務器還是客戶端位於同一機器上或世界的另一端,代碼都是相同的。 –

0

什麼生成和讀取或寫入選擇器SelectionKey上的事件?

OP_READ:在套接字接收緩衝區中存在數據或EOS。

OP_WRITE:在套接字發送緩衝區中的空間。

+0

好吧,但是當我想要在套接字上寫入時,似乎無法生成事件。在Selector.selectedKeys的javadoc中,它表示如果有人試圖將元素添加到Set,它會拋出異常。那麼如何生成一個寫作事件呢? – jerome

+0

好吧我要使用handleAccept方法中的SelectionKey,將它傳遞給我的控制器,並將其interestOps設置爲控制器寫入方法中的OP_WRITE。我認爲它會工作。 – jerome

+0

@jerome這隻在你有零長度寫入後纔有必要*。大多數情況下,只需調用'write()。'如果得到一個零結果並且有剩餘的數據,*然後*寄存器'OP_WRITE',排隊緩衝區等,並且當*不*得到一個零結果你必須*註銷* OP_WRITE,因爲它幾乎總是準備好,請參閱我的答案。當發送緩衝區已滿時,您只能得到零。 – EJP