2013-05-21 49 views
1

大家好日子!tcp服務器意外的行爲:爲什麼增加延遲增加服務客戶端的能力?

我的問題是關於基於NIO的服務器,我的情況如下:服務器從100個客戶端(100個客戶端線程)讀取消息,每個消息發送100條消息。因此,消息的總數是100x100 = 10000。我的服務器中有傳入消息計數器,它在從某個客戶端讀取一些消息後增加。當我剛剛閱讀消息時,我的服務器讀取大約9200條消息。 當我爲仿真服務延遲添加虛擬循環時,我的服務器驚人地服務於所有10000條消息!

我的期望是這樣的 - 好吧,服務器設法讀取所有10000條消息,即使時間很短。所以,沒有這個延遲服務器可能會讀取更多的消息(服務更多的客戶端)。但是你看,這是錯誤的。毫不拖延地事情變得更糟。 Here我描述了我的建築。在我目前的邏輯中唯一的修改是將接受客戶端和讀取消息分成2個不同的線程:一個選擇器在一個線程中接受客戶端,第二個選擇器在其他線程中等待來自連接客戶端的消息。

客戶機代碼

public class TCPClient implements Runnable{ 

private String name; 
private static TCPClient[] clients; 
private static Thread[] threads; 
private int counter = 0; 

public TCPClient(String name) 
{ 
    this.name = name; 
    this.counter = 0; 
} 

public static void main(String[] args) throws Exception 
{ 
    clients = new TCPClient[100]; 
    threads = new Thread[100]; 
    for(int i=0;i<100;i++) 
    { 
     clients[i] = new TCPClient("thread # "+Integer.toString(i)); 
     threads[i] = new Thread(clients[i]); 
     threads[i].start(); 
     // Thread.sleep(500); 
    } 
    for(int i=0;i<100;i++) 
    { 
     threads[i].join(); 
    } 
    for(int i=0;i<100;i++){ 
     System.out.println("counter = "+clients[i].counter); 
    } 

} 

@Override 
public void run() 
{ 
    Socket socket = null; 
    OutputStream out = null; 

    try 
    { 
     socket = new Socket(); 
     socket.connect(new InetSocketAddress("192.168.3.109",2345), 0); 
     out = socket.getOutputStream(); 

     byte[] bytes; 
     while(counter < 100) 
     { 
      counter++; 
      bytes = (name+ ", message # "+Integer.toString(counter)+System.lineSeparator()).getBytes(); 
      // System.out.println(counter); 
      out.write(bytes); 
      out.flush(); 
      Thread.sleep(200); 
     } 
    } 
    catch(Exception ex) 
    { 
      System.out.println(name+" "+Integer.toString(counter)); 
      ex.printStackTrace(new PrintStream(System.out)); 
      System.out.println(); 
    } 
    finally 
    { 
     if(socket!=null && out!=null) 
     { 
      try 
      { 
       socket.close(); 
       out.close(); 
      } 
      catch(Exception ex) 
      { 
       System.out.println("client close error"); 
      } 
     } 
    } 
} 

} 

服務器代碼(消息讀取部)

@Override 
public void run() 
{ 
    isRunning = true; 
    int acc = 0; 
    boolean error = false; 
    while (isRunning) { 
     try 
     { 


      selector.select(); 

      Set keys = selector.selectedKeys(); 
      Iterator it = keys.iterator(); 
      while(it.hasNext()) 
      { 
       SelectionKey key = (SelectionKey)it.next(); 



       if (key.isReadable()) 
       { 
         //readMessage(key); 
         //key.cancel(); 

         // ByteBuffer bbb = ByteBuffer.allocate(2048); 
         // key.cancel(); 
         curTime = System.currentTimeMillis(); 
         SocketChannel sc = (SocketChannel) key.channel(); 
         // System.out.println("before reading"); 
         bb.clear(); 
         int x = sc.read(bb); 

         if(x==-1) 
         { 
          key.cancel(); 
          //System.out.println("cancelling key"); 
          continue; 
         } 

         counter++; 
         // bb.flip(); 
         //System.out.print(decoder.decode(bb).toString()); 
        // Thread.sleep(20); 
         long sum=0; 
         for(int dummy=0;dummy<250000;dummy++) 
         { 
          sum += dummy; 
         // sum %= 1005; 
         } 
         long delta = System.currentTimeMillis() - curTime; 
         serviceTime += delta; 

         if(counter>9000) 
         { 
          System.out.println("recieved messages count = "+counter); 
          System.out.println("one message service time = "+delta+" milliseconds"); 
          System.out.println("total service time = "+serviceTime+" milliseconds"); 
          System.out.println("sum = "+sum); //11 249 925 000 
         } 

        // selector.wakeup(); 
         //key.interestOps(SelectionKey.OP_READ); 


       } 
      } 

      keys.clear(); 





     } 
     catch (Exception ex) 
     {  
      error = true; 
      System.out.println("error in recieving messages "+ex.getMessage()); 
      ex.printStackTrace(new PrintStream(System.out)); 
     // logger.println("error in recieving messages "+ex.getMessage()); 
     // logger.flush(); 
     } 
     finally 
     { 
      //if(error) // !!!!!!!!!!! DO NOT STOP THE SERVER EDIT IT LATER 
      //stopServer(); 
     } 
    } 
} 

可能有用的信息 - 在客戶端側的各2個消息之間的延遲是200毫秒。當虛擬循環使200000-220000迭代服務器完美工作。順便說一句,200000次迭代約爲200毫秒 - 因爲客戶端數量是100,所以在一個select()中的延遲是100×200000 = 200萬次迭代 - 對於現代PC是200毫秒。如果虛擬循環使得少於200000次迭代,則服務器讀取〜9200條消息。這種奇怪行爲的原因是什麼?

回答

1

這裏有太多的問題,很難知道從哪裏開始。

  1. read()返回-1時,您必須關閉該通道。取消密鑰是不夠的:你剛剛泄露了一個套接字。

  2. 如果read()返回一個正整數,它是一個讀取計數,您忽略。你假設你有一個完整的信息。

  3. 如果您在某個通道上收到任何IOException的I/O請務必關閉通道。

也許你不是接收整個消息除非你把睡在。否則當你把睡在你同時得到一個以上的消息,讓你更快的處理。

添加睡覺只是修復你的錯誤,它沒有它自己的魔法屬性。

+0

是的,你是對的,我一次獲得多條消息。我仍然無法理解這樣的行爲 - 當服務器讀取速度太快(沒有延遲)時,它看起來像在客戶端的某個緩衝區中積累的消息。換句話說,讀取客戶端消息的速度很快會導致客戶端忽略我的flush()調用。好的,另一個問題 - 如何強制沖水? – Baurzhan

+1

你在做更多的假設。 TCP中沒有這樣的信息,只有字節。TCP讀取有權讀取少至一個字節或多少個符合您提供的緩衝區的內容。沒有消息邊界,特別是'flush()'不會導致消息邊界。如果你想要它們,你必須將它們放入協議中,例如帶有長字前綴STX/ETX,自描述協議,如XML等。 – EJP