2017-03-17 17 views
0

我遇到了命名管道的問題。如果說30個客戶端管道都試圖同時連接到本地管道服務器,在4核心機器上,則會發生超時或信號量超時。有時候,最長的時間,一個客戶端需要一秒鐘才能獲得連接。再下一秒等等。我認爲本地管道訪問應該是快速的。爲什麼30個客戶端 - 即使100個客戶端需要相同的時間 - 也只需要1000毫秒就能建立一個連接?爲什麼命名管道意外地長時間連接到本地管道服務器?

using System; 
using System.Diagnostics; 
using System.IO.Pipes; 
using System.Security.AccessControl; 
using System.Threading; 
using System.Threading.Tasks; 

namespace PipeStress 
{ 
    class Program 
    { 
     public static PipeSecurity CreatePipeSecurity() 
     { 
      PipeSecurity ps; 
      using (var seedPipe = new NamedPipeServerStream("{DDAB520F-5104-48D1-AA65-7AEF568E0045}", 
       PipeDirection.InOut, 1, PipeTransmissionMode.Message, PipeOptions.None, 1000, 1000)) 
      { 
       ps = seedPipe.GetAccessControl(); 
      } 

      var sid = new System.Security.Principal.SecurityIdentifier(
       System.Security.Principal.WellKnownSidType.BuiltinUsersSid, null); 

      ps.AddAccessRule(new PipeAccessRule(sid, PipeAccessRights.ReadWrite | 
       PipeAccessRights.CreateNewInstance | PipeAccessRights.ChangePermissions, 
       AccessControlType.Allow)); 

      sid = new System.Security.Principal.SecurityIdentifier(
       System.Security.Principal.WellKnownSidType.LocalServiceSid, null); 

      ps.AddAccessRule(new PipeAccessRule(sid, PipeAccessRights.ReadWrite, 
       AccessControlType.Allow)); 

      return ps; 
     } 
     static void Main(string[] args) 
     { 
      Task.Run(() => RunPipeServer()); 

      for (var i = (uint) 0; i < 30; i++) 
      { 
       var index = i; 
       //Thread.Sleep(100); 
       Task.Run(() => RunPipeClient(index)); 
      } 

      Console.ReadLine(); 
     } 

     private const string PipeName = "{6FDABBF8-BFFD-4624-A67B-3211ED7EF0DC}"; 

     static void RunPipeServer() 
     { 
      try 
      { 
       var stw = new Stopwatch(); 

       while (true) 
       { 
        stw.Restart(); 

        var pipeServer = new NamedPipeServerStream(PipeName, PipeDirection.InOut, 
         NamedPipeServerStream.MaxAllowedServerInstances, 
         PipeTransmissionMode.Message, PipeOptions.Asynchronous, 4 * 1024, 4 * 1024, 
         CreatePipeSecurity()); 
        try 
        { 
         var pipe = pipeServer; 
         pipeServer.WaitForConnection(); 
         Console.WriteLine(stw.ElapsedMilliseconds + "ms"); 


         Task.Run(() => HandleClient(pipe)); 
        } 
        catch (Exception ex) 
        { 
         pipeServer.Dispose(); 
        } 
       } 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine(ex); 
      } 
     } 

     private static void HandleClient(NamedPipeServerStream pipeServer) 
     { 
      try 
      { 
       try 
       { 
        //Thread.Sleep(100); 
       } 
       finally 
       { 
        pipeServer.Close(); 
       } 
      } 
      finally 
      { 
       pipeServer.Dispose(); 
      } 
     } 

     static void RunPipeClient(uint i) 
     { 
      try 
      { 
       var j = 0; 

       while (true) 
       { 

        using (var pipeClient = new NamedPipeClientStream(".", PipeName, PipeDirection.InOut, PipeOptions.None)) 
        { 
         //Thread.Sleep(100); 

         pipeClient.Connect(5000); 
         try 
         { 
          Console.WriteLine($"{i}, {++j}"); 
          pipeClient.ReadByte(); 
         } 
         finally 
         { 
          pipeClient.Close(); 
         } 
        } 


       } 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine(ex); 
      } 
     } 
    } 
} 

回答

3

當向任何服務器添加負載時,會出現一些延遲。但是,在您的示例中,延遲時間恰好在1秒內發生,這種情況既過分又顯着。有序性對於發生的事情是非常重要的線索。 :)

實際上,您看到的延遲是由於線程池中創建新線程的延遲。另一部分證據是事實上,實際上,前幾個操作立即完成。等待時間僅在此之後纔開始發生,這與線程池已用完線程完全一致,並等待線程池的限制以允許創建新線程來爲請求提供服務。這種限制會限制新線程的創建,令人驚訝! :),每秒一次。

解決此問題的一種方法是增加線程池中的最小線程數,以便您擁有所有您需要的線程。這可以通過呼叫ThreadPool.SetMinThreads()來完成。但實際上,爲每個客戶端(和服務器)分配一個線程池線程是浪費的。最好是異步使用API​​,並讓.NET管理I/O。線程池線程仍然被使用,但只有在實際需要時纔會使用,即I/O操作實際完成時,並且僅用於處理該完成。首先,您將需要更少的線程,並且隨着線程需求的增加,線程池將盡快達到平衡。

這裏是你的代碼的版本說明,你會如何做到這一點(我完全刪除了CreatePipeSecurity()方法,因爲它不會出現在與您要問的問題,任何方式):

static void Main(string[] args) 
    { 
     CancellationTokenSource tokenSource = new CancellationTokenSource(); 
     List<Task> tasks = new List<Task>(); 

     tasks.Add(RunPipeServer(tokenSource.Token)); 

     for (var i = (uint)0; i < 30; i++) 
     { 
      var index = i; 
      tasks.Add(RunPipeClient(index, tokenSource.Token)); 
     } 

     Console.ReadLine(); 
     tokenSource.Cancel(); 

     Task.WaitAll(tasks.ToArray()); 
    } 

    private const string PipeName = "{6FDABBF8-BFFD-4624-A67B-3211ED7EF0DC}"; 

    static async Task RunPipeServer(CancellationToken token) 
    { 
     try 
     { 
      var stw = new Stopwatch(); 
      int clientCount = 0; 

      while (!token.IsCancellationRequested) 
      { 
       stw.Restart(); 

       var pipeServer = new NamedPipeServerStream(PipeName, PipeDirection.InOut, 
        NamedPipeServerStream.MaxAllowedServerInstances, 
        PipeTransmissionMode.Message, PipeOptions.Asynchronous); 
       try 
       { 
        token.Register(() => pipeServer.Close()); 
        await Task.Factory.FromAsync(pipeServer.BeginWaitForConnection, pipeServer.EndWaitForConnection, null); 
        clientCount++; 
        Console.WriteLine($"server connection #{clientCount}. {stw.ElapsedMilliseconds} ms"); 

        HandleClient(pipeServer); 
       } 
       catch (Exception ex) 
       { 
        Console.WriteLine("RunPipeServer exception: " + ex.Message); 
        pipeServer.Dispose(); 
       } 
      } 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine("RunPipeServer exception: " + ex.Message); 
      Console.WriteLine(ex); 
     } 
    } 

    // Left this method synchronous, as in your example it does almost nothing 
    // in this example. You might want to make this "async Task..." as well, if 
    // you wind up having this method do anything interesting. 
    private static void HandleClient(NamedPipeServerStream pipeServer) 
    { 
     pipeServer.Close(); 
    } 

    static async Task RunPipeClient(uint i, CancellationToken token) 
    { 
     try 
     { 
      var j = 0; 

      while (!token.IsCancellationRequested) 
      { 
       using (var pipeClient = new NamedPipeClientStream(".", PipeName, PipeDirection.InOut, PipeOptions.None)) 
       { 
        pipeClient.Connect(5000); 
        try 
        { 
         Console.WriteLine($"connected client {i}, connection #{++j}"); 
         await pipeClient.ReadAsync(new byte[1], 0, 1); 
        } 
        finally 
        { 
         pipeClient.Close(); 
        } 
       } 
      } 

      Console.WriteLine($"client {i} exiting normally"); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine($"RunPipeClient({i}) exception: {ex.Message}"); 
     } 
    } 
+0

非常感謝您的回覆。這非常有幫助和啓發。 – PieterB

相關問題