2017-03-07 65 views
2

在我的應用程序中,每個數據包在開始時都有2個字節的長度。然而,一段時間後應用程序開始接收長度小於零的時間。在同步客戶端中,一切正常,但速度太慢。我100%確定在服務器中一切正確。從網絡流中讀取具有大量數據包的異步數據

連接:

public void Connect(IPAddress ip, int port) 
    { 
     tcpClient.Connect(ip, port); 
     stream = tcpClient.GetStream(); 
     byte[] len_buffer = new byte[2]; 
     stream.BeginRead(len_buffer, 0, len_buffer.Length, OnDataRead, len_buffer); 
    } 

OnDataRead:

private void OnDataRead(IAsyncResult ar) 
    { 
      byte[] len = ar.AsyncState as byte[]; 
      int length = BitConverter.ToInt16(len, 0); 
      byte[] buffer = new byte[length]; 

      int remaining = length; 
      int pos = 0; 
      while (remaining != 0) 
      { 
       int add = stream.Read(buffer, pos, remaining); 
       pos += add; 
       remaining -= add; 
      } 
      Process(buffer); 
      len = new byte[2]; 

      stream.EndRead(ar); 
      stream.BeginRead(len, 0, len.Length, OnDataRead, len); 
    } 
+2

即使只是一個緩衝長度爲2,不保證對'Read'(或'BeginRead')的單個調用將導致讀取2個字節。如果需要讀取特定數量的字節,則始終需要檢查結果併發出進一步的讀取。 –

+0

你正在混合同步'stream.Read'和異步'stream.BeginRead'。這裏響起一些警鐘...... –

回答

1

正如我所看到的,你混了synchronious和asynchronious。這是一個不好的做法。

你想要的是一樣的東西:

var header = ReadHeader(); // 2 bytes 
var data = ReadData(header.DataSize); 

我沒有使用網絡流,但是.... 這裏是我的異步SocketReader的例子:

public static class SocketReader 
{ 
    // This method will continues read until count bytes are read. (or socket is closed) 
    private static void DoReadFromSocket(Socket socket, int bytesRead, int count, byte[] buffer, Action<ArraySegment<byte>> endRead) 
    { 
     // Start a BeginReceive. 
     try 
     { 
      socket.BeginReceive(buffer, bytesRead, count - bytesRead, SocketFlags.None, (asyncResult) => 
      { 
       // Get the bytes read. 
       int read = 0; 
       try 
       { 
        // if this goes wrong, the read remains 0 
        read = socket.EndReceive(asyncResult); 
       } 
       catch (ObjectDisposedException) { } 
       catch (Exception exception) 
       { 
        Trace.TraceError(exception.Message); 
       } 


       // if zero bytes received, the socket isn't available anymore. 
       if (read == 0) 
       { 
        endRead(new ArraySegment<byte>(buffer, 0, 0)); 
        return; 
       } 

       // increase the bytesRead, (position within the buffer) 
       bytesRead += read; 

       // if all bytes are read, call the endRead with the buffer. 
       if (bytesRead == count) 
        // All bytes are read. Invoke callback. 
        endRead(new ArraySegment<byte>(buffer, 0, count)); 
       else 
        // if not all bytes received, start another BeginReceive. 
        DoReadFromSocket(socket, bytesRead, count, buffer, endRead); 

      }, null); 
     } 
     catch (Exception exception) 
     { 
      Trace.TraceError(exception.Message); 
      endRead(new ArraySegment<byte>(buffer, 0, 0)); 
     } 
    } 

    public static void ReadFromSocket(Socket socket, int count, Action<ArraySegment<byte>> endRead) 
    { 
     // read from socket, construct a new buffer. 
     DoReadFromSocket(socket, 0, count, new byte[count], endRead); 
    } 

    public static void ReadFromSocket(Socket socket, int count, byte[] buffer, Action<ArraySegment<byte>> endRead) 
    { 
     // if you do have a buffer available, you can pass that one. (this way you do not construct new buffers for receiving and able to reuse buffers) 

     // if the buffer is too small, raise an exception, the caller should check the count and size of the buffer. 
     if (count > buffer.Length) 
      throw new ArgumentOutOfRangeException(nameof(count)); 

     DoReadFromSocket(socket, 0, count, buffer, endRead); 
    } 
} 

用法:

SocketReader.ReadFromSocket(socket, 2, (headerData) => 
{ 
    if(headerData.Count == 0) 
    { 
     // nothing/closed 
     return; 
    } 

    // Read the length of the data. 
    int length = BitConverter.ToInt16(headerData.Array, headerData.Offset); 

    SocketReader.ReadFromSocket(socket, length, (dataBufferSegment) => 
    { 
     if(dataBufferSegment.Count == 0) 
     { 
      // nothing/closed 
      return; 
     } 

     Process(dataBufferSegment); 

     // extra: if you need a binaryreader.. 
     using(var stream = new MemoryStream(dataBufferSegment.Array, dataBufferSegment.Offset, dataBufferSegment.Count)) 
     using(var reader = new BinaryReader(stream)) 
     { 
      var whatever = reader.ReadInt32(); 
     } 
    } 
}); 

您可以優化通過使緩衝器的接收緩衝器(看重載)


繼續接收:(重用receivebuffer)

public class PacketReader 
{ 
    private byte[] _receiveBuffer = new byte[2]; 

    // This will run until the socket is closed.  
    public void StartReceiving(Socket socket, Action<ArraySegment<byte>> process) 
    { 
     SocketReader.ReadFromSocket(socket, 2, _receiveBuffer, (headerData) => 
     { 
      if(headerData.Count == 0) 
      { 
       // nothing/closed 
       return; 
      } 

      // Read the length of the data. 
      int length = BitConverter.ToInt16(headerData.Array, headerData.Offset); 

      // if the receive buffer is too small, reallocate it. 
      if(_receiveBuffer.Length < length) 
       _receiveBuffer = new byte[length]; 

      SocketReader.ReadFromSocket(socket, length, _receiveBuffer, (dataBufferSegment) => 
      { 
       if(dataBufferSegment.Count == 0) 
       { 
        // nothing/closed 
        return; 
       } 

       try 
       { 
        process(dataBufferSegment); 
       } 
       catch { } 

       StartReceiving(socket, process); 
      }); 
     }); 
    } 
} 

用法:

private PacketReader _reader; 

public void Start() 
{ 
    _reader = new PacketReader(socket, HandlePacket); 
} 

private void HandlePacket(ArraySegment<byte> packet) 
{ 
    // do stuff..... 
} 
+0

謝謝,我肯定會用你的班級!這個問題在這裏'int length = BitConverter.ToInt16(len,0);'。它應該是'短長度'。 – Freshek

+0

將'short'存入'int'不成問題。 (原始代碼是一個int作爲datasize字段) –

+0

我將'int'改爲'short',它神奇地開始工作。 – Freshek