2012-10-08 122 views
1

當我真的壓力測試我的網絡代碼時,我遇到了一些問題。從本質上講,一旦插座設置在調用此:C#TcpClient丟失數據包

NetworkStream networkStream = mClient.GetStream(); 
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer); 


private void ReadCallback(IAsyncResult result) 
     { 
      try 
      { 
       int read; 
       NetworkStream networkStream; 
       try 
       { 
        networkStream = mClient.GetStream(); 
        read = networkStream.EndRead(result); 
       } 
       catch 
       { 
        return; 
       } 

       if (read == 0) 
       { 
        //The connection has been closed. 
        return; 
       } 

       var readBuffer = (byte[])result.AsyncState; 
       var readCount = readBuffer.Length; 
       while (readCount < 4) 
       { 
        readCount += networkStream.Read(readBuffer, 0, readBuffer.Length - readCount); 
       } 
       var length = BitConverter.ToInt32(readBuffer, 0); 
       var messageBuffer = new byte[length]; 
       readCount = 0; 
       while (readCount < length) 
       { 
        readCount += networkStream.Read(messageBuffer, 0, messageBuffer.Length - readCount); 
       } 
       else 
       { 
        RaiseMessageReceived(this, messageBuffer); 
       } 
       //Then start reading from the network again. 
       readBuffer = new byte[4]; //may not need to reset, not sure 
       networkStream.BeginRead(readBuffer, 0, readBuffer.Length, ReadCallback, readBuffer); 
      } 
      catch(Exception) 
      { 
       //Connection is dead, stop trying to read and wait for a heal to retrigger the read queue 
       return; 
      } 
     } 

然後下面是我的發送方法

private byte[] GetMessageWithLength(byte[] bytes) 
     { 
      //Combine the msg length to the msg 
      byte[] length = BitConverter.GetBytes(bytes.Length); 
      var msg = new byte[length.Length + bytes.Length]; 
      Buffer.BlockCopy(length, 0, msg, 0, length.Length); 
      Buffer.BlockCopy(bytes, 0, msg, length.Length, bytes.Length); 
      return msg; 
     } 

public override bool Send(byte[] bytes) 
     { 
      lock (sendQueue) 
      { 
       sendQueue.Enqueue(bytes); 
       Interlocked.Increment(ref sendQueueSize); 
      } 
      if (!mClient.Connected) 
      { 
       if (Connect()) 
       { 
        RaiseConnectionChanged(this, true, Localisation.TCPConnectionEstablished); 
       } 
       else 
       { 
        RaiseConnectionChanged(this, false, (bytes.Length > 0 ? Localisation.TCPMessageFailed : Localisation.TCPMessageConnectionLost)); 
       } 
      } 

      try 
      { 
       NetworkStream networkStream = mClient.GetStream(); 

       lock (sendQueue) 
       { 
        if (sendQueue.Count == 0) 
        { 
         return true; 
        } 
        bytes = sendQueue.Dequeue(); 
       } 
       var msg = GetMessageWithLength(bytes); 
       //Start async write operation 
       networkStream.BeginWrite(msg, 0, msg.Length, WriteCallback, null); 
      } 
      catch (Exception ex) 
      { 
       RaiseConnectionChanged(this, false, (bytes.Length > 0 ? Localisation.TCPMessageFailed : Localisation.TCPMessageConnectionLost)); 
      } 
      return true; 
     } 

     /// <summary> 
     /// Callback for Write operation 
     /// </summary> 
     /// <param name="result">The AsyncResult object</param> 
     private void WriteCallback(IAsyncResult result) 
     { 
      try 
      { 
       NetworkStream networkStream = mClient.GetStream(); 
       while (sendQueue.Count > 0) 
       { 
        byte[] bytes; 
        lock (sendQueue) 
        { 
         if (sendQueue.Count == 0) 
         { 
          break; 
         } 
         bytes = sendQueue.Dequeue(); 
        } 
        var msg = GetMessageWithLength(bytes); 
        networkStream.Write(msg, 0, msg.Length); 
        Interlocked.Decrement(ref sendQueueSize); 
       } 
       networkStream.EndWrite(result); 
       mLastPacketSentAt = Environment.TickCount; 
       Interlocked.Decrement(ref sendQueueSize); 
      } 
      catch (Exception ex) 
      { 
       RaiseConnectionChanged(this, false, Localisation.TCPMessageConnectionLost); 
      } 
     } 

不過啊,在某些時候,當我強調的測試系統(比如500級左右的客戶端一次發送大量的消息),我注意到每400萬隻可能有一個數據包沒有收到。我不確定問題在於發送還是接收,這就是爲什麼我包含這兩種方法。但是我會指出,如果我選擇從客戶端發送另一個數據包,它仍然會正確發送和接收數據,所以它不僅僅是排隊等等。

任何人都可以看到我失蹤的東西嗎?

+0

哦,而且我想在所有的漁獲量增加了console.writeline(「一些錯誤happend」),沒有喜悅 – Ash

+0

此外,連接()如果已連接 – Ash

回答

1

這兩個讀取循環(例如while (readCount < length))是越野車。你總是閱讀零偏移。你應該閱讀不斷增加的偏移量。

這會導致覆蓋已讀取的數據。

此外,我不確定混合同步和異步讀取是否是個好主意。您以這種方式失去了異步代碼的好處,並且仍然需要處理回調等。我認爲你應該決定一種風格並堅持下去。

+0

那是因爲流可能僅具有方法只返回true讀2個字節,但我知道數據包的長度被寫入4個字節,所以我需要繼續讀取,直到我有一個完整的4字節的數據已被讀取 – Ash

+1

據我所知。你正在閱讀緩衝區的開頭。如果碰巧讀取兩個雙字節塊((1,2),(3,4)),則緩衝區將包含(3,4,0,0)。這是一個錯誤。 – usr

+0

也即時混淆你的意思是他們是越野車? – Ash