2016-04-26 44 views
1

我最近遇到了一個問題,試圖播放使用SendAsync方法的開放的WebSocket,收到一個InvalidOperationException消息「發送操作已經在進行中」檢測「忙」的WebSocket

通過挖源代碼WebSocket的類,繁忙狀態在內部追蹤:

internal ChannelState _sendState; 

// Represents the state of a single channel; send and receive have their own states 
internal enum ChannelState { 
    Ready, // this channel is available for transmitting new frames 
    Busy, // the channel is already busy transmitting frames 
    Closed // this channel has been closed 
} 

理想的情況下,如果我要廣播的更新到WebSocket連接,我想提前知道它是忙碌,執行一些其他處理(例如排隊消息)。

這似乎很奇怪,這種狀態被標記爲內部的 - 是它的公共屬性,我可以簡單地檢查

context.WebSocket.SendState == ChannelState.Ready 

什麼是正確的模式SendAsync上的WebSocket會阻止拋出這個異常?

我不願意通過反射破解對該屬性的訪問。

編輯澄清:

的WebSocket.State屬性不會幫助這個情況。該屬性使用此枚舉:

public enum WebSocketState 
{ 
    None, 
    Connecting, 
    Open, 
    CloseSent, 
    CloseReceived, 
    Closed, 
    Aborted 
} 

一旦套接字連接已經打開,該語句將評估爲「真」,無論它是否是忙於發送:

context.WebSocket.State == WebSocketState.Open 
+0

你使用的是'AspNetWebSocket'嗎?如果是這樣,它有一個'國家'屬性你可以檢查。 https://msdn.microsoft.com/en-us/library/system.web.websockets.aspnetwebsocket.state(v=vs.110).aspx – dharms

+0

不幸的是,當套接字爲「忙」時,狀態屬性不會改變 - 它仍然是「開放」。嘗試在忙時啓動「SendAsync」會引發異常。 –

+0

也許修改您的代碼只有一個線程正在執行發送操作。該進程可以針對您的應用程序在發送消息時填充的隊列。 – dharms

回答

0

我實現一個似乎能滿足我需求的解決方案。

基本問題是兩個線程試圖在同一個WebSocket上發送。

我的解決方案有幾個部分,並且依賴於這是在AspNetWebSocketContect下運行的事實,所以我可以使用「Items」字典存儲有關當前連接的屬性。

  1. 「發送」屬性用於跟蹤WebSocket是否忙碌。
  2. 「隊列」屬性是要發送的ArraySegments的列表。
  3. 鎖定WebSocket對象並同步運行發送方法。
  4. 發送完成後處理隊列中的所有項目。
  5. 防止阻塞的方法開始時的產量。

這是我目前使用的開發環境的代碼 - 我會監視,看看它如何擴展:

/// <summary> 
/// Send a message to a specific client. 
/// </summary> 
/// <param name="context"></param> 
/// <param name="buffer"></param> 
/// <returns></returns> 
private static async Task SendMessage(AspNetWebSocketContext context, ArraySegment<byte> buffer) 
{ 
    // Return control to the calling method immediately. 
    await Task.Yield(); 

    // Make sure we have data. 
    if (buffer.Count == 0) 
     return; 

    // The state of the connection is contained in the context Items dictionary. 
    bool sending; 
    lock (context) 
    { 
     // Are we already in the middle of a send? 
     sending = (bool)context.Items["sending"]; 

     // If not, we are now. 
     if (!sending) 
      context.Items["sending"] = true; 
    } 

    if (!sending) 
    { 
     // Lock with a timeout, just in case. 
     if (!Monitor.TryEnter(context.WebSocket, 1000)) 
     { 
      // If we couldn't obtain exclusive access to the socket in one second, something is wrong. 
      await context.WebSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Empty, CancellationToken.None); 
      return; 
     } 

     try 
     { 
      // Send the message synchronously. 
      var t = context.WebSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None); 
      t.Wait(); 
     } 
     finally 
     { 
      Monitor.Exit(context.WebSocket); 
     } 

     // Note that we've finished sending. 
     lock (context) 
     { 
      context.Items["sending"] = false; 
     } 

     // Handle any queued messages. 
     await HandleQueue(context); 
    } 
    else 
    { 
     // Add the message to the queue. 
     lock (context) 
     { 
      var queue = context.Items["queue"] as List<ArraySegment<byte>>; 
      if (queue == null) 
       context.Items["queue"] = queue = new List<ArraySegment<byte>>(); 
      queue.Add(buffer); 
     } 
    } 
} 

/// <summary> 
/// If there was a message in the queue for this particular web socket connection, send it. 
/// </summary> 
/// <param name="context"></param> 
/// <returns></returns> 
private static async Task HandleQueue(AspNetWebSocketContext context) 
{ 
    var buffer = new ArraySegment<byte>(); 
    lock (context) 
    { 
     // Check for an item in the queue. 
     var queue = context.Items["queue"] as List<ArraySegment<byte>>; 
     if (queue != null && queue.Count > 0) 
     { 
      // Pull it off the top. 
      buffer = queue[0]; 
      queue.RemoveAt(0); 
     } 
    } 

    // Send that message. 
    if (buffer.Count > 0) 
     await SendMessage(context, buffer); 
} 

的幾點思考 - 我對這種做法:

  1. 我相信最好鎖定一個簡單的對象,而不是更復雜的對象,因爲我在上面做的。我不確定使用「context」和「context.WebSocket」對象進行鎖定會產生什麼樣的後果。
  2. 理論上我不應該鎖定WebSocket,因爲我已經在測試「發送」屬性。但是,測試導致WebSocket在幾秒鐘的重負載下無響應。一旦我實現了鎖定模式,這就消失了。
  3. 我用10個併發線程(每個發送1000條消息)通過相同的WebSocket連接做了測試。每條消息都有編號,所以我可以將它們與客戶端進行登錄,並且每條消息都通過了。所以排隊系統似乎工作。