2011-11-12 45 views
0

我有使用java nio的問題,並希望有很多java nio知識的人可以幫助我澄清一些錯誤概念。如何讓選擇器在Java中的socketchannel鍵更改nio

我正在使用java nio套接字。使用socketchannel.write()可能會填充寫入緩衝區。在這種情況下,剩餘的緩衝區排隊並且密鑰更改爲OP_WRITE。我的一個場景是隊列長度很長。每次調用selector.select()之前,我都會從另一個名爲pendingRequest的隊列中將密鑰更改爲OP_WRITE。但是我發現讀取過程非常緩慢,在發送處理完成後,有許多消息未寫入,並且仍然在隊列中。如何處理這個問題?

在我的代碼中,我有兩個寫作的地方。一個來自發生器:當它有消息要發佈時,它直接寫入頻道。如果緩衝區已滿,數據將被排隊。第二個地方是調度員:當密鑰可寫入時,它會調用write()來寫入排隊數據。我猜這兩部分可以競爭寫作。我只是覺得我的代碼缺少一些處理來配合兩次寫入。

有沒有解決上述問題的解決方案?我發現在我的代碼中有很多排隊的數據不能寫出來。當密鑰可寫時,生成器可能會再次寫入數據,這會導致排隊的數據變化較少而被寫出。如何使這部分正確?感謝

//在WriteListener()中,寫入代碼被以下三個部分

public synchronized int writeData(EventObject source) {  
    int n = 0; 
    int count = 0; 

    SocketChannel socket = (SocketChannel)source.getSource();  
    ByteBuffer buffer = ((WriteEvent)source).getBuffer(); 
    try { 
     write(socket); 
    } catch (IOException e1) {   
     e1.printStackTrace(); 
    }  

    while (buffer.position()>0) { 
     try {   
       buffer.flip(); 
       n = socket.write(buffer);         
       if(n == 0) { 
         key.interestOps(SelectionKey.OP_WRITE);       synchronized (this.pendingData) { 
          List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket); 
          if(queue == null) { 
           queue = new ArrayList<ByteBuffer>(); 
           this.pendingData.put(socket, queue); 
         } 
         queue.add(buffer); 

         logger.logInfo("queue length:" + queue.size()); 
        }            
        break; 
       }    
       count += n; 

     } catch (IOException e) {    
      e.printStackTrace(); 
     } finally {      
      buffer.compact();    
     } 
    } 

    if(buffer.position()==0) {      
     key.interestOps(SelectionKey.OP_READ);     
    } 
      return count; 

} 

// ====該寫入方法是用來寫排隊緩衝器

public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {   
    int n = 0; 
    int count = 0; 

    SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());     
    while (wbuf.position()>0) {  
     try {   
      wbuf.flip();   

      n = sc.write(wbuf);    

      if(n == 0) {  
        key.interestOps(SelectionKey.OP_WRITE);         
        synchronized (this.pendingData) { 
         List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc); 
         if(queue == null) { 
           queue = new ArrayList<ByteBuffer>(); 
           this.pendingData.put(sc, queue); 
         } 
         queue.add(wbuf); 
        } 

        break; 
       }    
       count += n; 

     } catch (IOException e) {    
      e.printStackTrace(); 
     } finally {    

      wbuf.compact();     
     } 
    } 

    if(wbuf.position()==0) {  
     wbuf.clear();    
     key.interestOps(SelectionKey.OP_READ);   
    } 

return n;  
} 

// ====當key.isWritable()爲真時,此方法是Dispatch的回調

public void write(SocketChannel socketChannel) throws IOException {   
    SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());  
    synchronized (this.pendingData) {    
     List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);    
     if(queue == null || queue.isEmpty()) {     
      // We wrote away all data, so we're no longer interested     
      // in writing on this socket. Switch back to waiting for data.     
      try {      
       if (key!=null)       
        key.interestOps(SelectionKey.OP_READ);     
      } catch(Exception ex) {      
       if (key!=null)       
        key.cancel();     
       }    
     }   

     // Write until there's not more data ...  
     int n = 0; 
     while (queue != null && !queue.isEmpty()) {     
      ByteBuffer buf = (ByteBuffer) queue.get(0); 
      // zero length write, break the loop and wait for next writable time 
      n = write(socketChannel, buf); 

      logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms."); 

      if(n==0) {    
       break; 
      } 
         queue.remove(0); 

     }   

} 
+0

請張貼一些代碼(最好是[SSCCE](http://sscce.org/)),證明您遇到的問題。 –

+0

最後的方法,即寫入隊列的方法,總是從隊列中刪除緩衝區,即使是「if(n == 0)」。你應該在刪除緩衝區之前做這個測試並打破*,實際上你應該只在緩衝區爲空的時候刪除它。目前你正在丟失數據。 – EJP

+0

在我當前的代碼中,在測試寫入的字節數後刪除(0)。目前的代碼工作,但性能不如它應該做的那樣好。在緩衝區滿後的下一個可寫時間需要多長時間? – susan

回答

0

如果您有消費者呃這太慢了,唯一的選擇可能是斷開它們來保護你的服務器。你不想讓一個壞消費者影響你的其他客戶。

我通常會將發送緩衝區大小增加到如果填充的位置,我關閉連接。這避免了在Java代碼中處理未寫入數據的複雜性,因爲您所做的只是將緩衝區擴大一點點。如果你增加發送緩衝區的大小,你就是在透明地做這個。有可能你甚至不需要使用發送緩衝區大小,缺省值通常是64 KB左右。

+0

我使用以下方法更改socketchannel buffersize。但爲什麼serverSocket只能設置receivebuffer的大小。socket.socket()。setReceiveBufferSize(256 * 1024); socket.socket()。setSendBufferSize(256 * 1024); – susan

+1

@susan ServerSocket允許您更改接收緩衝區大小,該大小由接受的套接字繼承,因此您可以將它們設置爲> 64k。如果你試圖對接受的套接字這麼做,它會失敗,因爲> 64k需要在連接握手期間協商的TCP「窗口縮放」選項。出於同樣的原因,如果您想在客戶端套接字上設置接收緩衝大於64k,則必須在連接它之前執行此操作。您可以隨時設置發送緩衝區,因爲它不需要協議的幫助。 – EJP

+0

@susan更正:它不會'失敗',但64k以上的部分將不會被使用,除非在連接之前設置了大小。 – EJP

0
  1. 您必須確保新數據在已寫入待處理數據之後纔會入隊。

  2. 如果行爲依然存在,那麼您實際上只有兩種選擇:或者以不當行爲爲由斷開客戶端,或者停止爲其生成輸出,直到待辦事項清除。可能都是。

您可以通過熟練的select()超時實現第一個操作。如果select()返回零,則表示要麼沒有註冊的頻道,要麼在超時期間沒有任何事情發生,在這種情況下,您可能想要考慮從全部客戶端斷開連接。如果你有太多的併發客戶端工作太忙,那麼你必須跟蹤每個頻道最後一次被選中的時間,並斷開其最後一次活動時間太久的頻道。

在該超時期限內,您可能會想要停止生產輸出,而他是慢讀。

'長期'的確切定義是作爲讀者的練習而留下的,但十分鐘後想到的是第一個近似值。

+0

我非常感謝你的幫助。它通過在生成器側生成下一條消息之前寫出消息來工作。但對於其他進程,如果緩衝區已滿,請等待unitl再次有空間再寫入。 – susan

相關問題