2012-06-01 86 views
6

我使用this教程來構建一個沒有可寫部分的java nio服務器。使用Java的異步服務器NIO

一切工作正常,但有一個有趣的事:

  • 當客戶端發送數據包太快,服務器無法接收所有郵件,服務器始終獲得第一和第二數據包,但不比那更多的。
  • 如果客戶端緩慢發送數據包,服務器將獲取所有數據包。

任何想法?

我正在添加服務器類代碼,如果您需要在下面的代碼中提到的另一個類,我在這裏:)。

NIOServer類:

package server; 
import java.io.IOException; 
import java.net.InetAddress; 
import java.net.InetSocketAddress; 
import java.net.Socket; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.nio.channels.spi.SelectorProvider; 
import java.util.*; 

import javax.xml.parsers.ParserConfigurationException; 

import org.xml.sax.SAXException; 

public class NioServer implements Runnable { 



// The host:port combination to listen on 
    private InetAddress hostAddress; 
    private int port; 

    // The channel on which we'll accept connections 
    private ServerSocketChannel serverChannel; 

    // The selector we'll be monitoring 
    private Selector selector; 

    //the cach will hundle the messages that came 
    private Cache cache; 

    // The buffer into which we'll read data when it's available 
    private ByteBuffer readBuffer = ByteBuffer.allocate(8192); 

    public NioServer(InetAddress hostAddress, int port , Cache cache) throws IOException { 
    this.cache = cache; 
    this.hostAddress = hostAddress; 
    this.port = port; 
    this.selector = this.initSelector(); 
    } 


    private Selector initSelector() throws IOException { 
     // Create a new selector 
     Selector socketSelector = SelectorProvider.provider().openSelector(); 

     // Create a new non-blocking server socket channel 
     this.serverChannel = ServerSocketChannel.open(); 
     serverChannel.configureBlocking(false); 

     // Bind the server socket to the specified address and port 
     InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port); 
     serverChannel.socket().bind(isa); 

     // Register the server socket channel, indicating an interest in 
     // accepting new connections 
     serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); 

     return socketSelector; 
     } 

    private void accept(SelectionKey key) throws IOException { 
     // For an accept to be pending the channel must be a server socket channel. 
     ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); 

     // Accept the connection and make it non-blocking 
     SocketChannel socketChannel = serverSocketChannel.accept(); 
     Socket socket = socketChannel.socket(); 
     socketChannel.configureBlocking(false); 

     // Register the new SocketChannel with our Selector, indicating 
     // we'd like to be notified when there's data waiting to be read 
     socketChannel.register(this.selector, SelectionKey.OP_READ); 
     } 

    private void read(SelectionKey key) throws IOException { 
     SocketChannel socketChannel = (SocketChannel) key.channel(); 

     // Clear out our read buffer so it's ready for new data 
     this.readBuffer.clear(); 

     // Attempt to read off the channel 
     int numRead; 
     try { 
      numRead = socketChannel.read(this.readBuffer); 
      String test = new String(this.readBuffer.array()); 
      System.out.print(test); 

     } catch (IOException e) { 
      // The remote forcibly closed the connection, cancel 
      // the selection key and close the channel. 
     // key.cancel(); 
     // socketChannel.close(); 
      return; 
     } 

     if (numRead == -1) { 
      // Remote entity shut the socket down cleanly. Do the 
      // same from our end and cancel the channel. 
      key.channel().close(); 
      key.cancel(); 
      return; 
     } 

     // Hand the data off to our worker thread 
     this.cache.processData(this, socketChannel, this.readBuffer.array(), numRead); 
     } 

    public void run() { 
     while (true) { 
      try { 
      // Wait for an event one of the registered channels 

      this.selector.select(); 



      // Iterate over the set of keys for which events are available 
      Iterator selectedKeys = this.selector.selectedKeys().iterator(); 
      while (selectedKeys.hasNext()) { 
       SelectionKey key = (SelectionKey) selectedKeys.next(); 
       selectedKeys.remove(); 

       if (!key.isValid()) { 
       continue; 
       } 

       // Check what event is available and deal with it 
       if (key.isAcceptable()) { 
       this.accept(key); 
       } else if (key.isReadable()) { 
       this.read(key); 
       } 
      } 
      } catch (Exception e) { 
      e.printStackTrace(); 
      } 
     } 
     } 

    public static void main(String[] args) throws ParserConfigurationException, SAXException { 
    try { 
     Cache cache = new Cache(); 
     new Thread(cache).start(); 
     new Thread(new NioServer(null, 9090,cache)).start(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    } 
+3

代碼中必須有一個錯誤。如果您需要更多幫助,請提供更多信息。 –

+0

我沒有現在的代碼,我將在星期天。謝謝 –

+3

TCP不會丟失數據,NIO也不會丟失數據。你要麼不讀取所有的數據,要麼丟掉一些數據。如果沒有一些代碼發表評論,就不可能進一步評論。 – EJP

回答

1

我希望,如果你在讀UDP。請注意您在read方法上處理數據包的速度有多慢。您正在將它們打印到system.out,這是非常緩慢的,並且不確定您能夠以多快的速度將數據處理到processData方法上的其他線程。 This library我寫的可以幫助你做線程間非阻塞通信,如果這是你的滯後來源。您還應該檢查基礎讀取套接字緩衝區的大小。數據包越大越好,在數據包開始丟失之前,你必須快速並且趕上。對於TCP,如果底層套接字緩衝區已滿,您可能會在通道上收到IOException。對於UDP,數據包將悄悄丟棄。

要訪問底層讀取套接字緩衝區大小,你可以這樣做:

final Socket socket = channel.socket(); 
System.out.println(socket.getReceiveBufferSize()); 
socket.setReceiveBufferSize(newSize); 

注:據我所知,Linux的可能需要爲了某些操作系統配置爲您更改底層的緩衝區大小。如果setReceiveBufferSize沒有效果(再次閱讀它,看看它是否被改變),谷歌關於它。 :)