2013-02-20 16 views
0

我正在編寫一個程序來測試Java的網絡API(舊的io vs nio vs nio2)。我的Java套接字代碼破壞數據

我有發送只是兩個值的服務器:

  1. System.nanoTime()
  2. 計數器進行計數所發送的消息數。

客戶端收到此數據後,將遠程System.nanoTime()與本地時間戳進行比較以計算延遲並檢查計數器以確保沒有數據丟失。

由於這只是一個測試,服務器和客戶端都運行在同一個JVM中。 90%的時間數據傳輸正確;然而,每隔一段時間,時間戳就會完全錯誤。它看起來可能是一個溢出/下溢錯誤,但我看不出它是如何引入的。下面是一個錯誤的例子:

ERROR: counter 3, remoteTS -8267580102784516096, localTS 155321716184402, diff 8267735424500700498

注意,本地時間戳155321716184402晚上7點以後轉化爲一點點。但是遠程時間戳只是消極的!如果你看看代碼,我沒有做任何花哨的數學計算,但它不可能是負面的。我也看不到我如何得到溢出錯誤。我認爲這可能是由於大vs小,但是每個值都是錯誤的,而不僅僅是其中的一部分。

的代碼(其是由一個稍微大的測試中提取)如下:

package networkioshootout; 

import static java.lang.System.out; 

import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.net.InetAddress; 
import java.net.ServerSocket; 
import java.net.Socket; 
import java.net.UnknownHostException; 
import java.nio.ByteBuffer; 
import java.util.Date; 
import java.util.Random; 
import java.util.concurrent.ExecutionException; 


public class DebugNetwork { 
    private final static int SENDCOUNT = 100; 
    private final static int PORT = 9000; 
    private final static int TESTLOOP = 10; 
    private final static Random rn = new Random(); 

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { 
     long currentNanos = System.nanoTime(); 
     long currentMillis = System.currentTimeMillis(); 
     Date now = new Date(); 
     System.out.println(String.format("Current date/time:%s, nanos:%s, millis:%s", 
       now, currentNanos, currentMillis)); 

     //Server 
     new Server().start(); 

     //Client 
     for(int i=0; i< TESTLOOP; i++){ 
      final int DATASIZE = (1+rn.nextInt(99))*8; 
      clientInputstream(DATASIZE); 
     } 
    } 

    private static void clientInputstream(int bufferSize) throws IOException, UnknownHostException { 
     final byte[] internalBuffer = new byte[bufferSize+16] ; 
     final ByteBuffer longExtractor = ByteBuffer.allocate(16); 

     int bytesReadSoFar = 0; 
     long counter = 0; 

     Socket client = new Socket(InetAddress.getLocalHost(), PORT); 
     InputStream in = client.getInputStream(); 

     byte[] data = new byte[bufferSize]; 
     int size = 0; 

     try{ 
      while(-1 != (size = in.read(data))){ 
       for(int i=0; i < size; i++){ 
        internalBuffer[i+bytesReadSoFar] = data[i]; 
       } 
       bytesReadSoFar += size; 

       if(bytesReadSoFar >= 16){ 
        int values = bytesReadSoFar/16; 
        int toRead = values; 
        int remainder = bytesReadSoFar % 16; 

        for(int i=0; i< toRead; i++){ 
         int j = i * 16; 

         //long remoteTS = ByteBuffer.wrap(new byte[]{internalBuffer[j+0],internalBuffer[j+1],internalBuffer[j+2],internalBuffer[j+3],internalBuffer[j+4],internalBuffer[j+5],internalBuffer[j+6],internalBuffer[j+7]}).getLong(); 
         //long remoteCounter = ByteBuffer.wrap(new byte[]{internalBuffer[j+8],internalBuffer[j+9],internalBuffer[j+10],internalBuffer[j+11],internalBuffer[j+12],internalBuffer[j+13],internalBuffer[j+14],internalBuffer[j+15]}).getLong(); 

         //long remoteTS = data[0] | ((int)(data[1]) << 4) | ((int)(data[1]) << 8) | ((int)(data[1]) << 12) | ((int)(data[1]) << 16) | ((int)(data[1]) << 20) | ((int)(data[1]) << 24) ; 

         longExtractor.put(internalBuffer, j, 16); 
         longExtractor.flip(); 
         long remoteTS = longExtractor.getLong(); 
         long remoteCounter = longExtractor.getLong(); 
         longExtractor.clear(); 

         if(remoteCounter != counter){ 
          String error = "ERROR: Expected remote counter to be "+counter+" but it was actually "+remoteCounter; 
          //System.out.println(error); 
          throw new RuntimeException(error); 
         } 
         counter++; 

         long localTS = System.nanoTime(); 
         long latency = localTS - remoteTS; 
         if(Math.abs(latency) > 1200000000) { 
          out.println(String.format("ERROR: counter %s, remoteTS %s, localTS %s, diff %s", 
            counter, remoteTS, localTS, latency)); 
          continue; 
         } 


        } 

        //System.arraycopy(data, toRead, data, 0, remainder); 
        for(int i=0; i < remainder; i++){ 
         internalBuffer[i] = internalBuffer[i+toRead]; 
        } 
        bytesReadSoFar = remainder; 
       } 
      } 
     } 
     finally{ 
      client.close(); 
     } 
    } 

    static final class Server extends Thread{ 

     public void run(){ 
      try { 
       startServer(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 

     private static void startServer() throws IOException { 
      final ServerSocket server = new ServerSocket(PORT); 

      //System.out.println("Server listening on port "+PORT); 

      while(true){ 
       final Socket c1 = server.accept(); 
       c1.setTcpNoDelay(true); 
       //System.out.println("Client connected"); 
       new Thread(new Runnable() { 

        @Override 
        public void run() { 
         long totalMsgs = 0; 
         long counter = 0; 
         DataOutputStream serverout; 
         try { 
          serverout = new DataOutputStream(c1.getOutputStream()); 
          for(int i=0;i<SENDCOUNT;i++){ 
           serverout.writeLong(System.nanoTime()); 
           serverout.writeLong(counter); 
           totalMsgs++; 
           counter++; 
          } 
          //System.out.println("Sent bytes to client: "+total); 
         } catch (IOException e) { 
          out.println("Messages sent:"+totalMsgs+", current counter:"+counter); 
          e.printStackTrace(); 
         } 
         finally{ 
          //System.out.println("Client disconnected when counter was "+counter); 
          try { c1.close(); } catch (IOException e) { e.printStackTrace();} 
         } 
        } 
       }).start(); 
      } 
     } 
    } 

} 

編輯: 由於已經有關於這個一些註釋,實際程序有連接到服務器的輸入客戶端流,緩衝流,NIO,NIO2。這是一個更完整(但已過時)的程序版本: https://gist.github.com/falconair/4975243

我還沒有添加datainput流,試驗了套接字選項等。我想在我之前解決數據損壞問題繼續前進。

+0

如果'bytesReadSoFar'是40,會發生什麼? – parsifal 2013-02-20 21:05:01

+0

你會發現使用DataInputStream.readLong()而不是所有這些手工編碼,ByteBuffers等代碼更容易編碼,而且它也可能會擺脫你的錯誤。 – EJP 2013-02-20 21:14:49

+0

@EJP在客戶端上使用inputstream是測試的一部分。我也使用緩衝流,選擇器,異步等,所有這些已被刪除這個問題。整點是比較的方法:) – Shahbaz 2013-02-20 21:53:58

回答

1

的錯誤是什麼,與你使用的data[]internalBuffer[]和所有的數據移動。我沒有看到真正的客戶正在使用這種代碼編寫任何東西。任何人都會理智地使用BufferedInputStream

如果你想測試不同的緩衝區大小的效果,使用new DataInputStream(new BufferedInputStream(socket.getInputStream(), bufferSize))readLong()並得到完全擺脫datainternalBufferlongExtractor:他們只是造成不相關的問題。

以下工作flawlessy:

private static void clientInputstream(int bufferSize) throws IOException, UnknownHostException 
{ 
    long counter = 0; 

    Socket client = new Socket(InetAddress.getLocalHost(), PORT); 
    DataInputStream in = new DataInputStream(new BufferedInputStream(client.getInputStream(), bufferSize)); 

    try 
    { 
     for (;;) 
     { 
      long remoteTS = in.readLong(); 
      long remoteCounter = in.readLong(); 
      if (remoteCounter != counter) 
      { 
       String error = "ERROR: Expected remote counter to be " + counter + " but it was actually " + remoteCounter; 
       //System.out.println(error); 
       throw new RuntimeException(error); 
      } 
      counter++; 

      long localTS = System.nanoTime(); 
      long latency = localTS - remoteTS; 
      if (Math.abs(latency) > 1200000000) 
      { 
       out.println(String.format("ERROR: counter %s, remoteTS %s, localTS %s, diff %s", 
        counter, remoteTS, localTS, latency)); 
       continue; 
      } 
     } 
    } 
    catch (EOFException exc) 
    { 
     System.out.println("EOS"); 
    } 
    finally 
    { 
     client.close(); 
    } 
} 
+0

該錯誤發生在循環底部的代碼中,當未使用的字節寫入內部緩衝區時,我正在通過'toRead'而不是'toRead * 16'來移動!感謝您的詳細解答@EJP – Shahbaz 2013-02-21 02:47:20

0

System.nanoTime()返回的值特定於正在運行的JVM。你應該使用System.currentTimeMillis()來代替。

This method can only be used to measure elapsed time and is not related to any other notion of system or wall-clock time. The value returned represents nanoseconds since some fixed but arbitrary origin time (perhaps in the future, so values may be negative). The same origin is used by all invocations of this method in an instance of a Java virtual machine; other virtual machine instances are likely to use a different origin.

編輯:

既然你正在運行測試在同一個JVM,(這)錯誤的來源必須是從什麼是上述(不同但應考慮使用'currentTimeMillis'使這些值在不同的JVM中是可比的)。

我建議使用BufferedInputStream緩衝流,然後每次讀取和處理N(16?)個字節的塊。

Socket client = new Socket(InetAddress.getLocalHost(), PORT); 
InputStream in = new BufferedInputStream(client.getInputStream()); 

int length = 16, offset=0;  
while (length>0) { 
    int read = in.read(data,offset,length); 
    if (read<0) ... //connection error 
    offset+=read; 
    length-=read; 
} 
+1

正如OP所說,客戶端和服務器*都運行在同一個JVM中。 – parsifal 2013-02-20 21:07:34

+0

@parsifal的好處(儘管在代碼部署在不同的JVM中後會出現問題)。 – Javier 2013-02-20 21:17:31

+0

@Javier這個測試實際上有幾個客戶端的實現,其中一個是緩衝輸入。我正在測試從服務器獲取數據的各種方法。 – Shahbaz 2013-02-20 21:56:15