2013-01-10 58 views
1

我正在接受來自不同客戶端的多個tcp連接的c#應用程序(.net 4)。有一個接受套接字的單個tcp偵聽器。雙工通信黑白節點。使用Networkstream.Write方法發送數據並使用Networkstream.read方法讀取數據。對於每個tcp連接,創建一個單獨的線程。網絡流寫入被阻止

問題是,幾天前我們注意到其中一個客戶端停止了20分鐘的數據讀取(由於bug)。由於連接沒有中斷,服務器上沒有(IO)異常。但是,我們注意到其他客戶的數據也沒有進行。 20分鐘後,該客戶再次開始接收數據,不久其他客戶也開始接收數據。

我知道網絡流的寫入方法是一種阻塞方法,我們沒有使用任何超時。因此,寫入已被阻止(描述爲here)。但據我瞭解,每個tcp連接必須有一個單獨的寫入緩衝區,或者有更多的東西在播放。可以在TCP連接發送阻塞,在同一個應用程序中影響其他TCP連接嗎?

這裏是寫操作的僞代碼。對於每個連接,單獨的線程都有一個獨立的傳出隊列進程。

public class TCPServerListener : baseConnection 
{ 

    private readonly int _Port; 
    private TcpListener _tcpListener; 
    private Thread _thread; 
    private List<TcpClientData> _tcpClientDataList = new List<TcpClientData>(); 
    private long _messageDiscardTimeout; 
    private bool LoopForClientConnection = true; 

    public TCPServerListener(int port, ThreadPriority threadPriority) 
    { 
     try 
     { 
      // init property 
     } 
     catch (Exception ex) 
     { 
      // log 
     } 
    } 

    public void SendMessageToAll(int type) 
    { 
     base.EnqueueMessageToSend(type, _tcpClientDataList); 
    } 
    public void SendMessageToList(int type, IList<TcpClient> tcpClientList) 
    { 
     base.EnqueueMessageToSend(type, tcpClientList); 
    } 
    public void SendMessage(int type, TcpClient tcpClient) 
    { 
     base.EnqueueMessageToSend(type, tcpClient); 
    } 



    private void AcceptClientConnections() 
    { 
     while (LoopForClientConnection) 
     { 
      try 
      { 
       Socket socket = _tcpListener.AcceptSocket(); 
       TcpClientData tcpClientData = new TcpClientData(); 
       tcpClientData.tcpClientThread = new Thread(new ParameterizedThreadStart(StartAsync)); 
       tcpClientData.tcpClientThread.Priority = _threadPriority; 
       tcpClientData.tcpClientThread.IsBackground = true; 
       tcpClientData.tcpClientThread.Name = "CD" + tcpClientData.tcpClientThread.ManagedThreadId; 
       tcpClientData.tcpClient = new TcpClient(); 
       tcpClientData.tcpClient.Client = socket; 
       _tcpClientDataList.Add(tcpClientData); 
       tcpClientData.tcpClientThread.Start(tcpClientData.tcpClient); 
      } 
      catch (ThreadAbortException ex) 
      { 
       //log 

      } 
      catch (Exception ex) 
      { 
       //log 
      } 
     } 
    } 




    public override void Start() 
    { 
     base.Start(); 
     _tcpListener = new TcpListener(System.Net.IPAddress.Any, _Port); 

     _thread = new Thread(AcceptClientConnections); 
     _thread.Priority = _threadPriority; 
     _thread.IsBackground = true; 

     _tcpListener.Start(); 
     _thread.Start(); 
    } 

    public override void Stop() 
    { 
     // stop listener and terminate threads 
    } 
} 


public class baseConnection 
{ 
    private Thread _InCommingThread; 
    private Thread _OutGoingThread; 
    protected ThreadPriority _threadPriority; 
    protected BlockingCollection<MessageReceived> _InComingMessageQueue = new BlockingCollection<MessageReceived>(); 
    protected BlockingCollection<MessageToSend> _OutgoingMessageQueue = new BlockingCollection<MessageToSend>(); 

    public void StartAsync(Object oTcpClient) 
    { 
     TcpClient tcpClient = oTcpClient as TcpClient; 
     if (tcpClient == null) 
      return; 

     using (tcpClient) 
     { 
      using (NetworkStream stream = tcpClient.GetStream()) 
      { 
       stream.ReadTimeout = Timeout.Infinite; 
       stream.WriteTimeout = Timeout.Infinite; 

       BinaryReader bodyReader = new BinaryReader(stream); 

       while (tcpClient.Connected) 
       { 
        try 
        { 
         int messageType = bodyReader.ReadInt32(); 

         // checks to verify messages 

         // enqueue message in incoming queue 
         _InComingMessageQueue.Add(new MessageReceived(messageType, tcpClient)); 
        } 
        catch (EndOfStreamException ex) 
        { 
         // log 
         break; 
        } 
        catch (Exception ex) 
        { 
         // log 
         Thread.Sleep(100); 
        } 
       } 
       //RaiseDisconnected(tcpClient); 
      } 
     } 
    } 


    public virtual void Start() 
    { 
     _InCommingThread = new Thread(HandleInCommingMessnge); 
     _InCommingThread.Priority = _threadPriority; 
     _InCommingThread.IsBackground = true; 
     _InCommingThread.Start(); 

     _OutGoingThread = new Thread(HandleOutgoingQueue); 
     _OutGoingThread.Priority = _threadPriority; 
     _OutGoingThread.IsBackground = true; 
     _OutGoingThread.Start(); 
    } 


    public virtual void Stop() 
    { 
     // stop the threads and free up resources 
    } 

    protected void EnqueueMessageToSend(int type, List<TcpClientData> tcpClientDataList) 
    { 
     tcpClientDataList.ForEach(x => _OutgoingMessageQueue.Add(new MessageToSend(type, x.tcpClient))); 
    } 
    protected void EnqueueMessageToSend(int type, IList<TcpClient> tcpClientList) 
    { 
     foreach (TcpClient tcpClient in tcpClientList) 
     { 
      _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient)); 
     } 
    } 
    protected void EnqueueMessageToSend(int type, TcpClient tcpClient) 
    { 
     _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient)); 
    } 


    private void HandleOutgoingQueue() 
    { 
     while (true) 
     { 
      try 
      { 

       MessageToSend message = _OutgoingMessageQueue.Take(); 

       if (message.tcpClient.Connected) 
       { 
        BinaryWriter writer = new BinaryWriter(message.tcpClient.GetStream()); 
        writer.Write(message.type); 
       } 
      } 
      catch (ThreadAbortException ex) 
      { 
       // log 
       return; 
      } 
      catch (Exception ex) 
      { 
       //_logger.Error(ex.Message, ex); 
      } 
     } 
    } 

    private void HandleInCommingMessnge() 
    { 
     while (true) 
     { 
      try 
      { 
       MessageReceived messageReceived = _InComingMessageQueue.Take(); 

       // handle message 
      } 
      catch (ThreadAbortException ex) 
      { 
       // log 
       return; 
      } 
      catch (Exception ex) 
      { 
       // log 
       //_logger.Error(ex.Message, ex); 
      } 
     } 
    } 

    public class MessageReceived 
    { 
     public MessageReceived(int type, TcpClient tcpClient) 
     { 
      this.tcpClient = tcpClient; 
      this.type = type; 
     } 

     public int type; 
     public TcpClient tcpClient; 
    } 

    public class MessageToSend 
    { 
     public MessageToSend(int type, TcpClient tcpClient) 
     { 
      this.tcpClient = tcpClient; 
      this.type = type; 
     } 

     public int type; 
     public TcpClient tcpClient; 
    } 

    public class TcpClientData 
    { 
     public Thread tcpClientThread; 
     public TcpClient tcpClient; 
    } 
} 
+1

它不應該阻塞和緩衝區是相互獨立的 - 線程在寫入方法周圍使用某種'lock'嗎? –

+0

我已經添加了一些代碼示例,並且沒有鎖。謝謝。 –

回答

1

你提到的每個連接一個單獨的線程創建的,但你所示的代碼似乎能出隊的消息中任何連接。

如果此代碼在多個線程上運行,則當每個線程正試圖向阻塞連接發送消息時,程序將立即阻止。如果此循環在多個線程上運行,則可能會遇到的另一個問題是消息可能無法以相同連接的正確順序到達。

+0

我發佈了更多代碼以提供更好的圖片。 –

+1

如果您查看'HandleOutgoingQueue()'方法 - 它將在單個線程上運行,併爲每個連接的客戶端同步處理傳出消息。當一個Write()被阻塞時,其他連接都不會被寫入。 –