2016-08-01 23 views
1

我試圖建立一個命令行聊天室,服務器正在處理連接並將來自一個客戶端的輸入重複回到所有其他客戶端。 目前服務器能夠接受來自多個客戶端的輸入,但只能將信息單獨發回給這些客戶端。我認爲我的問題是,每個連接正在一個單獨的線程上處理。我將如何允許線程彼此進行通信或能夠將數據發送到每個線程?使用多線程通過NetworkStream發送數據

Server代碼:

namespace ConsoleApplication 
{ 


    class TcpHelper 
    { 


     private static object _lock = new object(); 
     private static List<Task> _connections = new List<Task>(); 


     private static TcpListener listener { get; set; } 
     private static bool accept { get; set; } = false; 

     private static Task StartListener() 
     { 
      return Task.Run(async() => 
      { 
       IPAddress address = IPAddress.Parse("127.0.0.1"); 
       int port = 5678; 
       listener = new TcpListener(address, port); 

       listener.Start(); 

       Console.WriteLine($"Server started. Listening to TCP clients at 127.0.0.1:{port}"); 

       while (true) 
       { 
        var tcpClient = await listener.AcceptTcpClientAsync(); 
        Console.WriteLine("Client has connected"); 
        var task = StartHandleConnectionAsync(tcpClient); 
        if (task.IsFaulted) 
         task.Wait(); 
       } 
      }); 
     } 

     // Register and handle the connection 
     private static async Task StartHandleConnectionAsync(TcpClient tcpClient) 
     { 
      // start the new connection task 
      var connectionTask = HandleConnectionAsync(tcpClient); 



      // add it to the list of pending task 
      lock (_lock) 
       _connections.Add(connectionTask); 

      // catch all errors of HandleConnectionAsync 
      try 
      { 
       await connectionTask; 

      } 
      catch (Exception ex) 
      { 
       // log the error 
       Console.WriteLine(ex.ToString()); 
      } 
      finally 
      { 
       // remove pending task 
       lock (_lock) 
        _connections.Remove(connectionTask); 
      } 
     } 






     private static async Task HandleConnectionAsync(TcpClient client) 
     { 

      await Task.Yield(); 


      { 
       using (var networkStream = client.GetStream()) 
       { 

        if (client != null) 
        { 
         Console.WriteLine("Client connected. Waiting for data."); 



         StreamReader streamreader = new StreamReader(networkStream); 
         StreamWriter streamwriter = new StreamWriter(networkStream); 

         string clientmessage = ""; 
         string servermessage = ""; 


         while (clientmessage != null && clientmessage != "quit") 
         { 
          clientmessage = await streamreader.ReadLineAsync(); 
          Console.WriteLine(clientmessage); 
          servermessage = clientmessage; 
          streamwriter.WriteLine(servermessage); 
          streamwriter.Flush(); 


         } 
         Console.WriteLine("Closing connection."); 
         networkStream.Dispose(); 
        } 
       } 

      } 

     } 
     public static void Main(string[] args) 
     { 
      // Start the server 

      Console.WriteLine("Hit Ctrl-C to close the chat server"); 
      TcpHelper.StartListener().Wait(); 

     } 

    } 

} 

客戶端代碼:

namespace Client2 
{ 
    public class Program 
    { 

     private static void clientConnect() 
     { 
      TcpClient socketForServer = new TcpClient(); 
      bool status = true; 
      string userName; 
      Console.Write("Input Username: "); 
      userName = Console.ReadLine(); 

      try 
      { 
       IPAddress address = IPAddress.Parse("127.0.0.1"); 
       socketForServer.ConnectAsync(address, 5678); 
       Console.WriteLine("Connected to Server"); 
      } 
      catch 
      { 
       Console.WriteLine("Failed to Connect to server{0}:999", "localhost"); 
       return; 
      } 
      NetworkStream networkStream = socketForServer.GetStream(); 
      StreamReader streamreader = new StreamReader(networkStream); 
      StreamWriter streamwriter = new StreamWriter(networkStream); 
      try 
      { 
       string clientmessage = ""; 
       string servermessage = ""; 
       while (status) 
       { 
        Console.Write(userName + ": "); 
        clientmessage = Console.ReadLine(); 
        if ((clientmessage == "quit") || (clientmessage == "QUIT")) 
        { 
         status = false; 
         streamwriter.WriteLine("quit"); 
         streamwriter.WriteLine(userName + " has left the conversation"); 
         streamwriter.Flush(); 

        } 
        if ((clientmessage != "quit") && (clientmessage != "quit")) 
        { 
         streamwriter.WriteLine(userName + ": " + clientmessage); 
         streamwriter.Flush(); 
         servermessage = streamreader.ReadLine(); 
         Console.WriteLine("Server:" + servermessage); 
        } 
       } 
      } 
      catch 
      { 
       Console.WriteLine("Exception reading from the server"); 
      } 
      streamreader.Dispose(); 
      networkStream.Dispose(); 
      streamwriter.Dispose(); 
     } 
     public static void Main(string[] args) 
     { 
      clientConnect(); 
     } 
    } 
} 

回答

2

最主要的錯在你的代碼是,你不嘗試發送從一個客戶機接收到所連接的其他客戶端的數據。您的服務器上有_connections列表,但存儲在列表中的唯一東西是連接的Task對象,您甚至不會對這些對象執行任何操作。

相反,您應該自己維護一個連接列表,以便當您從一個客戶端收到消息時,可以將該消息轉發給其他客戶端。

至少應該是List<TcpClient>,但是因爲您使用的是StreamReaderStreamWriter,所以您還需要初始化並將這些對象存儲在列表中。另外,你應該包含一個客戶端標識符。一個明顯的選擇是客戶端名稱(即用戶輸入的名稱),但是您的示例在聊天協議中沒有提供任何機制來傳輸該標識作爲連接初始化的一部分,所以在我的示例(下面)我只是使用一個簡單的整數值。

有你發佈的代碼一些其他的違規行爲,如:

  • 開始在一個全新的線程任務,只是爲了執行該讓你啓動異步操作的點了幾個語句。在我的例子中,我只是省略了代碼的Task.Run()部分,因爲它不是必需的。
  • 當它返回IsFaulted時檢查連接特定的任務。由於這個Task對象返回時實際上不可能發生任何I/O,因此該邏輯幾乎沒有用處。對Wait()的調用將拋出異常,該異常將傳播到主線程的Wait()調用,終止服務器。但是,如果出現任何其他錯誤,您不會終止服務器,所以目前尚不清楚您爲什麼要這麼做。
  • 有一個虛假的電話Task.Yield()。我不知道你想在那裏完成什麼,但不管它是什麼,這個聲明沒有用。我只是刪除它。
  • 在您的客戶端代碼中,您只會在發送數據時嘗試從服務器接收數據。這是非常錯誤的;您希望客戶能夠及時響應,並在收到數據後立即收到數據。在我的版本中,我包含了一個簡單的小匿名方法,立即調用該方法來啓動一個單獨的消息接收循環,該循環將與主用戶輸入循環異步並行執行。
  • 同樣在客戶端代碼中,您正在發送「&hellip; has left&hellip;」消息將導致服務器關閉連接的「退出」消息。這意味着服務器永遠不會收到「......剩下的......」信息。我顛倒了消息的順序,以便「退出」始終是客戶端發送的最後一件事情。

我的版本是這樣的:

服務器:

class TcpHelper 
{ 
    class ClientData : IDisposable 
    { 
     private static int _nextId; 

     public int ID { get; private set; } 
     public TcpClient Client { get; private set; } 
     public TextReader Reader { get; private set; } 
     public TextWriter Writer { get; private set; } 

     public ClientData(TcpClient client) 
     { 
      ID = _nextId++; 
      Client = client; 

      NetworkStream stream = client.GetStream(); 

      Reader = new StreamReader(stream); 
      Writer = new StreamWriter(stream); 
     } 

     public void Dispose() 
     { 
      Writer.Close(); 
      Reader.Close(); 
      Client.Close(); 
     } 
    } 

    private static readonly object _lock = new object(); 
    private static readonly List<ClientData> _connections = new List<ClientData>(); 

    private static TcpListener listener { get; set; } 
    private static bool accept { get; set; } 

    public static async Task StartListener() 
    { 
     IPAddress address = IPAddress.Any; 
     int port = 5678; 
     listener = new TcpListener(address, port); 

     listener.Start(); 

     Console.WriteLine("Server started. Listening to TCP clients on port {0}", port); 

     while (true) 
     { 
      var tcpClient = await listener.AcceptTcpClientAsync(); 
      Console.WriteLine("Client has connected"); 
      var task = StartHandleConnectionAsync(tcpClient); 
      if (task.IsFaulted) 
       task.Wait(); 
     } 
    } 

    // Register and handle the connection 
    private static async Task StartHandleConnectionAsync(TcpClient tcpClient) 
    { 
     ClientData clientData = new ClientData(tcpClient); 

     lock (_lock) _connections.Add(clientData); 

     // catch all errors of HandleConnectionAsync 
     try 
     { 
      await HandleConnectionAsync(clientData); 
     } 
     catch (Exception ex) 
     { 
      // log the error 
      Console.WriteLine(ex.ToString()); 
     } 
     finally 
     { 
      lock (_lock) _connections.Remove(clientData); 
      clientData.Dispose(); 
     } 
    } 

    private static async Task HandleConnectionAsync(ClientData clientData) 
    { 
     Console.WriteLine("Client connected. Waiting for data."); 

     string clientmessage; 

     while ((clientmessage = await clientData.Reader.ReadLineAsync()) != null && clientmessage != "quit") 
     { 
      string message = "From " + clientData.ID + ": " + clientmessage; 

      Console.WriteLine(message); 

      lock (_lock) 
      { 
       // Locking the entire operation ensures that a) none of the client objects 
       // are disposed before we can write to them, and b) all of the chat messages 
       // are received in the same order by all clients. 
       foreach (ClientData recipient in _connections.Where(r => r.ID != clientData.ID)) 
       { 
        recipient.Writer.WriteLine(message); 
        recipient.Writer.Flush(); 
       } 
      } 
     } 
     Console.WriteLine("Closing connection."); 
    } 
} 

客戶:

class Program 
{ 
    private const int _kport = 5678; 

    private static async Task clientConnect() 
    { 
     IPAddress address = IPAddress.Loopback; 
     TcpClient socketForServer = new TcpClient(); 
     string userName; 
     Console.Write("Input Username: "); 
     userName = Console.ReadLine(); 

     try 
     { 
      await socketForServer.ConnectAsync(address, _kport); 
      Console.WriteLine("Connected to Server"); 
     } 
     catch (Exception e) 
     { 
      Console.WriteLine("Failed to Connect to server {0}:{1}", address, _kport); 
      return; 
     } 


     using (NetworkStream networkStream = socketForServer.GetStream()) 
     { 
      var readTask = ((Func<Task>)(async() => 
      { 
       using (StreamReader reader = new StreamReader(networkStream)) 
       { 
        string receivedText; 

        while ((receivedText = await reader.ReadLineAsync()) != null) 
        { 
         Console.WriteLine("Server:" + receivedText); 
        } 
       } 
      }))(); 

      using (StreamWriter streamwriter = new StreamWriter(networkStream)) 
      { 
       try 
       { 
        while (true) 
        { 
         Console.Write(userName + ": "); 
         string clientmessage = Console.ReadLine(); 
         if ((clientmessage == "quit") || (clientmessage == "QUIT")) 
         { 
          streamwriter.WriteLine(userName + " has left the conversation"); 
          streamwriter.WriteLine("quit"); 
          streamwriter.Flush(); 
          break; 
         } 
         else 
         { 
          streamwriter.WriteLine(userName + ": " + clientmessage); 
          streamwriter.Flush(); 
         } 
        } 

        await readTask; 
       } 
       catch (Exception e) 
       { 
        Console.WriteLine("Exception writing to server: " + e); 
        throw; 
       } 
      } 
     } 
    } 

    public static void Main(string[] args) 
    { 
     clientConnect().Wait(); 
    } 
} 

還有很多你需要去努力。您可能想要在服務器端實現適當的聊天用戶名初始化。至少,對於真實世界的代碼,您希望進行更多的錯誤檢查,並確保客戶端ID可靠地生成(如果您只需要正面的ID值,則不能超過2^31-1在它回滾到0之前的連接)。

我還做了其他一些非必要的小改動,例如使用IPAddress.AnyIPAddress.Loopback的值而不是解析字符串,並且通常簡化並清理代碼。此外,我目前還沒有使用C#6編譯器,所以我更改了使用C#6功能的代碼,以便它可以使用C#5進行編譯。

要做一個完整的聊天服務器,你仍然有你的工作切斷你。但我希望上述讓你回到正確的軌道上。

+0

WOW。很有幫助。是的,我大部分時間都是用Google搜索,把我發現的東西拼湊在一起,並排除任何錯誤。你真的超越了我,我非常感激。我曾以爲我需要存儲客戶端,但我不確定這是否多餘。非常感謝你清理這個! – hereswilson

+0

@hereswilson:高興地幫忙。請注意,以上只是您如何做事的一個例子。這不是你如何做的最後一句話。例如,您可以選擇甚至將消息發送回發送它的客戶端(簡化客戶端枚舉)。另外請注意,我已經更改了中繼部分的同步,同步整個操作,而不是隻獲取要發送給客戶端的部分。我決定我更喜歡這樣(因爲代碼中的評論中描述的原因)。 –