2011-10-10 154 views
16

我試圖將F#中的簡單異步TCP服務器移植到C#4中。服務器接收連接,讀取單個請求並在關閉連接之前回傳一系列響應。WCF性能,延遲和可伸縮性

在C#4中的異步看起來乏味和容易出錯,所以我想我會嘗試使用WCF來代替。這臺服務器不大可能在野外看到1,000個同時發生的請求,所以我認爲吞吐量和延遲是有趣的。

我在C#中編寫了一個最小雙工WCF Web服務和控制檯客戶端。儘管我使用的是WCF而不是原始套接字,但這已經是175行代碼,而原始代碼只有80行。但我更關心的性能和可擴展性:

  • 延遲是154 ×與WCF更糟。
  • 吞吐量爲54 ×與WCF差。
  • TCP處理1000個併發連接,很容易,但WCF扼流圈只是20

首先,我使用的是默認設置一切,所以我想知道如果有什麼我可以調整,以改善這些性能數據?

其次,我想知道是否有人使用WCF這種事情,或者如果它是這個工作的錯誤工具?

下面是在C#中我的WCF服務器:

IService1.cs

[DataContract] 
public class Stock 
{ 
    [DataMember] 
    public DateTime FirstDealDate { get; set; } 
    [DataMember] 
    public DateTime LastDealDate { get; set; } 
    [DataMember] 
    public DateTime StartDate { get; set; } 
    [DataMember] 
    public DateTime EndDate { get; set; } 
    [DataMember] 
    public decimal Open { get; set; } 
    [DataMember] 
    public decimal High { get; set; } 
    [DataMember] 
    public decimal Low { get; set; } 
    [DataMember] 
    public decimal Close { get; set; } 
    [DataMember] 
    public decimal VolumeWeightedPrice { get; set; } 
    [DataMember] 
    public decimal TotalQuantity { get; set; } 
} 

[ServiceContract(CallbackContract = typeof(IPutStock))] 
public interface IStock 
{ 
    [OperationContract] 
    void GetStocks(); 
} 

public interface IPutStock 
{ 
    [OperationContract] 
    void PutStock(Stock stock); 
} 

Service1.svc

<%@ ServiceHost Language="C#" Debug="true" Service="DuplexWcfService2.Stocks" CodeBehind="Service1.svc.cs" %> 

Service1.svc.cs

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)] 
public class Stocks : IStock 
{ 
    IPutStock callback; 

    #region IStock Members 
    public void GetStocks() 
    { 
    callback = OperationContext.Current.GetCallbackChannel<IPutStock>(); 
    Stock st = null; 
    st = new Stock 
    { 
     FirstDealDate = System.DateTime.Now, 
     LastDealDate = System.DateTime.Now, 
     StartDate = System.DateTime.Now, 
     EndDate = System.DateTime.Now, 
     Open = 495, 
     High = 495, 
     Low = 495, 
     Close = 495, 
     VolumeWeightedPrice = 495, 
     TotalQuantity = 495 
    }; 
    for (int i=0; i<1000; ++i) 
     callback.PutStock(st); 
    } 
    #endregion 
} 

Web.config

<?xml version="1.0"?> 
<configuration> 
    <system.web> 
    <compilation debug="true" targetFramework="4.0" /> 
    </system.web> 
    <system.serviceModel> 
    <services> 
     <service name="DuplexWcfService2.Stocks"> 
     <endpoint address="" binding="wsDualHttpBinding" contract="DuplexWcfService2.IStock"> 
      <identity> 
      <dns value="localhost"/> 
      </identity> 
     </endpoint> 
     <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange"/> 
     </service> 
    </services> 
    <behaviors> 
     <serviceBehaviors> 
     <behavior> 
      <serviceMetadata httpGetEnabled="true"/> 
      <serviceDebug includeExceptionDetailInFaults="true"/> 
     </behavior> 
     </serviceBehaviors> 
    </behaviors> 
    <serviceHostingEnvironment multipleSiteBindingsEnabled="true" /> 
    </system.serviceModel> 
    <system.webServer> 
    <modules runAllManagedModulesForAllRequests="true"/> 
    </system.webServer> 
</configuration> 

這裏是C#WCF客戶端:

Program.cs

[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext = false)] 
class Callback : DuplexWcfService2.IStockCallback 
{ 
    System.Diagnostics.Stopwatch timer; 
    int n; 

    public Callback(System.Diagnostics.Stopwatch t) 
    { 
    timer = t; 
    n = 0; 
    } 

    public void PutStock(DuplexWcfService2.Stock st) 
    { 
    ++n; 
    if (n == 1) 
     Console.WriteLine("First result in " + this.timer.Elapsed.TotalSeconds + "s"); 
    if (n == 1000) 
     Console.WriteLine("1,000 results in " + this.timer.Elapsed.TotalSeconds + "s"); 
    } 
} 

class Program 
{ 
    static void Test(int i) 
    { 
    var timer = System.Diagnostics.Stopwatch.StartNew(); 
    var ctx = new InstanceContext(new Callback(timer)); 
    var proxy = new DuplexWcfService2.StockClient(ctx); 
    proxy.GetStocks(); 
    Console.WriteLine(i + " connected"); 
    } 

    static void Main(string[] args) 
    { 
    for (int i=0; i<10; ++i) 
    { 
     int j = i; 
     new System.Threading.Thread(() => Test(j)).Start(); 
    } 
    } 
} 

這裏是我的異步TCP客戶端和服務器端的代碼在F#:

type AggregatedDeals = 
    { 
    FirstDealTime: System.DateTime 
    LastDealTime: System.DateTime 
    StartTime: System.DateTime 
    EndTime: System.DateTime 
    Open: decimal 
    High: decimal 
    Low: decimal 
    Close: decimal 
    VolumeWeightedPrice: decimal 
    TotalQuantity: decimal 
    } 

let read (stream: System.IO.Stream) = async { 
    let! header = stream.AsyncRead 4 
    let length = System.BitConverter.ToInt32(header, 0) 
    let! body = stream.AsyncRead length 
    let fmt = System.Runtime.Serialization.Formatters.Binary.BinaryFormatter() 
    use stream = new System.IO.MemoryStream(body) 
    return fmt.Deserialize(stream) 
} 

let write (stream: System.IO.Stream) value = async { 
    let body = 
    let fmt = System.Runtime.Serialization.Formatters.Binary.BinaryFormatter() 
    use stream = new System.IO.MemoryStream() 
    fmt.Serialize(stream, value) 
    stream.ToArray() 
    let header = System.BitConverter.GetBytes body.Length 
    do! stream.AsyncWrite header 
    do! stream.AsyncWrite body 
} 

let endPoint = System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 4502) 

let server() = async { 
    let listener = System.Net.Sockets.TcpListener(endPoint) 
    listener.Start() 
    while true do 
    let client = listener.AcceptTcpClient() 
    async { 
     use stream = client.GetStream() 
     let! _ = stream.AsyncRead 1 
     for i in 1..1000 do 
     let aggregatedDeals = 
      { 
      FirstDealTime = System.DateTime.Now 
      LastDealTime = System.DateTime.Now 
      StartTime = System.DateTime.Now 
      EndTime = System.DateTime.Now 
      Open = 1m 
      High = 1m 
      Low = 1m 
      Close = 1m 
      VolumeWeightedPrice = 1m 
      TotalQuantity = 1m 
      } 
     do! write stream aggregatedDeals 
    } |> Async.Start 
} 

let client() = async { 
    let timer = System.Diagnostics.Stopwatch.StartNew() 
    use client = new System.Net.Sockets.TcpClient() 
    client.Connect endPoint 
    use stream = client.GetStream() 
    do! stream.AsyncWrite [|0uy|] 
    for i in 1..1000 do 
    let! _ = read stream 
    if i=1 then lock stdout (fun() -> 
     printfn "First result in %fs" timer.Elapsed.TotalSeconds) 
    lock stdout (fun() -> 
    printfn "1,000 results in %fs" timer.Elapsed.TotalSeconds) 
} 

do 
    server() |> Async.Start 
    seq { for i in 1..100 -> client() } 
    |> Async.Parallel 
    |> Async.RunSynchronously 
    |> ignore 
+4

我想嘗試的第一件事就是切換WCF從wsdualhttp結合nettcp(禁用安全)的東西更具有可比性。 – Brian

+0

又見也許http://www.devproconnections.com/article/net-framework2/concurrency-and-throttling-configurations-for-wcf-services – Brian

+0

好問題。我有你的C#服務在本地運行,但我無法讓F#編譯進行性能比較。我從來沒有讀過F#之前...我需要添加什麼來使它編譯超越剪切和粘貼上面的代碼? – ErnieL

回答

5

要回答你的第二個問題首先,與之相比,WCF總是會有開銷原始插座。但它有一噸的功能(如安全,reliablity,互操作性,多種傳輸協議,跟蹤等)相比,原始套接字,權衡是否接受你是根據你的情況。它看起來像你正在做一些金融交易應用程序和WCF可能是不適合你的情況(雖然我在金融行業是不是有經驗的資格這一點)。

關於第一個問題,而不是託管在客戶端的獨立WCF服務,使客戶端可以是服務本身,並使用netTCP如果可能的話結合雙http綁定嘗試。在服務行爲中調整serviceThrottling元素中的屬性。 .Net 4之前的默認值較低。

25

對於幾乎所有的默認值,WCF都會選擇非常安全的值。這遵循不讓新手開發者自己拍攝的哲學。但是,如果您知道要更改的限制以及要使用的綁定,則可以獲得合理的性能和縮放比例。

在我的核心i5-2400(四核,沒有超線程,3.10 GHz)下面的解決方案將運行1000個客戶端,每個客戶端有1000個回調,平均總運行時間爲20秒。這是20秒內1,000,000次WCF呼叫。

不幸的是我無法讓你的F#程序運行直接比較。如果你在你的盒子上運行我的解決方案,你可以發表一些F#和C#WCF性能比較數字嗎?


免責聲明:以下旨在概念的證明。其中一些設置對生產沒有意義。

我做了什麼:

  • 刪除了雙螺旋結合並有客戶創造他們自己的 服務主機接收回調。這實質上是雙面綁定在底層所做的事情。 (這也是Pratik的 建議)
  • 更改綁定到netTcpBinding。
  • 更改限制值:
    • WCF:maxConcurrentCalls,maxConcurrentSessions, maxConcurrentInstances所有1000
    • TCP binding:MAXCONNECTIONS = 1000
    • 線程池:最小工作線程= 1000,最小的IO線程= 2000
  • 添加了IsOneWay來服務操作

請注意,在此原型中,所有服務和客戶端都位於同一App域中並共享相同的線程池。

我的教訓:

  • 當有一個「無連接可以作出,因爲目標機器積極地拒絕它」客戶端異常
    • 可能的原因:
      1. WCF限制了已達到
      2. TCP限制已達到
      3. 沒有I/O廣告可用於處理該通話。
    • 爲#3的解決方案是要麼:
      1. 增加分鐘IO線程數 - 或 -
      2. 擁有的StockService做了一個工作線程回調(這確實增加總運行時間)
  • 添加IsOneWay將運行時間減半(從40秒到20秒)。

運行在覈心i5-2400上的程序輸出。 注意定時器的使用與原始問題中的不同(請參閱代碼)。

All client hosts open. 
Service Host opened. Starting timer... 
Press ENTER to close the host one you see 'ALL DONE'. 
Client #100 completed 1,000 results in 0.0542168 s 
Client #200 completed 1,000 results in 0.0794684 s 
Client #300 completed 1,000 results in 0.0673078 s 
Client #400 completed 1,000 results in 0.0527753 s 
Client #500 completed 1,000 results in 0.0581796 s 
Client #600 completed 1,000 results in 0.0770291 s 
Client #700 completed 1,000 results in 0.0681298 s 
Client #800 completed 1,000 results in 0.0649353 s 
Client #900 completed 1,000 results in 0.0714947 s 
Client #1000 completed 1,000 results in 0.0450857 s 
ALL DONE. Total number of clients: 1000 Total runtime: 19323 msec 

代碼都在一個控制檯應用程序文件:

using System; 
using System.Collections.Generic; 
using System.ServiceModel; 
using System.Diagnostics; 
using System.Threading; 
using System.Runtime.Serialization; 

namespace StockApp 
{ 
    [DataContract] 
    public class Stock 
    { 
     [DataMember] 
     public DateTime FirstDealDate { get; set; } 
     [DataMember] 
     public DateTime LastDealDate { get; set; } 
     [DataMember] 
     public DateTime StartDate { get; set; } 
     [DataMember] 
     public DateTime EndDate { get; set; } 
     [DataMember] 
     public decimal Open { get; set; } 
     [DataMember] 
     public decimal High { get; set; } 
     [DataMember] 
     public decimal Low { get; set; } 
     [DataMember] 
     public decimal Close { get; set; } 
     [DataMember] 
     public decimal VolumeWeightedPrice { get; set; } 
     [DataMember] 
     public decimal TotalQuantity { get; set; } 
    } 

    [ServiceContract] 
    public interface IStock 
    { 
     [OperationContract(IsOneWay = true)] 
     void GetStocks(string address); 
    } 

    [ServiceContract] 
    public interface IPutStock 
    { 
     [OperationContract(IsOneWay = true)] 
     void PutStock(Stock stock); 
    } 

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] 
    public class StocksService : IStock 
    { 
     public void SendStocks(object obj) 
     { 
      string address = (string)obj; 
      ChannelFactory<IPutStock> factory = new ChannelFactory<IPutStock>("CallbackClientEndpoint"); 
      IPutStock callback = factory.CreateChannel(new EndpointAddress(address)); 

      Stock st = null; st = new Stock 
      { 
       FirstDealDate = System.DateTime.Now, 
       LastDealDate = System.DateTime.Now, 
       StartDate = System.DateTime.Now, 
       EndDate = System.DateTime.Now, 
       Open = 495, 
       High = 495, 
       Low = 495, 
       Close = 495, 
       VolumeWeightedPrice = 495, 
       TotalQuantity = 495 
      }; 

      for (int i = 0; i < 1000; ++i) 
       callback.PutStock(st); 

      //Console.WriteLine("Done calling {0}", address); 

      ((ICommunicationObject)callback).Shutdown(); 
      factory.Shutdown(); 
     } 

     public void GetStocks(string address) 
     { 
      /// WCF service methods execute on IO threads. 
      /// Passing work off to worker thread improves service responsiveness... with a measurable cost in total runtime. 
      System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SendStocks), address); 

      // SendStocks(address); 
     } 
    } 

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)] 
    public class Callback : IPutStock 
    { 
     public static int CallbacksCompleted = 0; 
     System.Diagnostics.Stopwatch timer = Stopwatch.StartNew(); 
     int n = 0; 

     public void PutStock(Stock st) 
     { 
      ++n; 
      if (n == 1000) 
      { 
       //Console.WriteLine("1,000 results in " + this.timer.Elapsed.TotalSeconds + "s"); 

       int compelted = Interlocked.Increment(ref CallbacksCompleted); 
       if (compelted % 100 == 0) 
       { 
        Console.WriteLine("Client #{0} completed 1,000 results in {1} s", compelted, this.timer.Elapsed.TotalSeconds); 

        if (compelted == Program.CLIENT_COUNT) 
        { 
         Console.WriteLine("ALL DONE. Total number of clients: {0} Total runtime: {1} msec", Program.CLIENT_COUNT, Program.ProgramTimer.ElapsedMilliseconds); 
        } 
       } 
      } 
     } 
    } 

    class Program 
    { 
     public const int CLIENT_COUNT = 1000;   // TEST WITH DIFFERENT VALUES 

     public static System.Diagnostics.Stopwatch ProgramTimer; 

     static void StartCallPool(object uriObj) 
     { 
      string callbackUri = (string)uriObj; 
      ChannelFactory<IStock> factory = new ChannelFactory<IStock>("StockClientEndpoint"); 
      IStock proxy = factory.CreateChannel(); 

      proxy.GetStocks(callbackUri); 

      ((ICommunicationObject)proxy).Shutdown(); 
      factory.Shutdown(); 
     } 

     static void Test() 
     { 
      ThreadPool.SetMinThreads(CLIENT_COUNT, CLIENT_COUNT * 2); 

      // Create all the hosts that will recieve call backs. 
      List<ServiceHost> callBackHosts = new List<ServiceHost>(); 
      for (int i = 0; i < CLIENT_COUNT; ++i) 
      { 
       string port = string.Format("{0}", i).PadLeft(3, '0'); 
       string baseAddress = "net.tcp://localhost:7" + port + "/"; 
       ServiceHost callbackHost = new ServiceHost(typeof(Callback), new Uri[] { new Uri(baseAddress)}); 
       callbackHost.Open(); 
       callBackHosts.Add(callbackHost);    
      } 
      Console.WriteLine("All client hosts open."); 

      ServiceHost stockHost = new ServiceHost(typeof(StocksService)); 
      stockHost.Open(); 

      Console.WriteLine("Service Host opened. Starting timer..."); 
      ProgramTimer = Stopwatch.StartNew(); 

      foreach (var callbackHost in callBackHosts) 
      { 
       ThreadPool.QueueUserWorkItem(new WaitCallback(StartCallPool), callbackHost.BaseAddresses[0].AbsoluteUri); 
      } 

      Console.WriteLine("Press ENTER to close the host once you see 'ALL DONE'."); 
      Console.ReadLine(); 

      foreach (var h in callBackHosts) 
       h.Shutdown(); 
      stockHost.Shutdown(); 
     } 

     static void Main(string[] args) 
     { 
      Test(); 
     } 
    } 

    public static class Extensions 
    { 
     static public void Shutdown(this ICommunicationObject obj) 
     { 
      try 
      { 
       obj.Close(); 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine("Shutdown exception: {0}", ex.Message); 
       obj.Abort(); 
      } 
     } 
    } 
} 

的app.config:

<?xml version="1.0" encoding="utf-8" ?> 
<configuration> 
    <system.serviceModel> 
    <services> 
     <service name="StockApp.StocksService"> 
     <host> 
      <baseAddresses> 
      <add baseAddress="net.tcp://localhost:8123/StockApp/"/> 
      </baseAddresses> 
     </host> 
     <endpoint address="" binding="netTcpBinding" bindingConfiguration="tcpConfig" contract="StockApp.IStock"> 
      <identity> 
      <dns value="localhost"/> 
      </identity> 
     </endpoint> 
     </service> 

     <service name="StockApp.Callback"> 
     <host> 
      <baseAddresses> 
      <!-- Base address defined at runtime. --> 
      </baseAddresses> 
     </host> 
     <endpoint address="" binding="netTcpBinding" bindingConfiguration="tcpConfig" contract="StockApp.IPutStock"> 
      <identity> 
      <dns value="localhost"/> 
      </identity> 
     </endpoint> 
     </service> 
    </services> 

    <client> 
     <endpoint name="StockClientEndpoint" 
       address="net.tcp://localhost:8123/StockApp/" 
           binding="netTcpBinding" 
       bindingConfiguration="tcpConfig" 
           contract="StockApp.IStock" > 
     </endpoint> 

     <!-- CallbackClientEndpoint address defined at runtime. --> 
     <endpoint name="CallbackClientEndpoint" 
       binding="netTcpBinding" 
       bindingConfiguration="tcpConfig" 
       contract="StockApp.IPutStock" > 
     </endpoint> 
    </client> 

    <behaviors> 
     <serviceBehaviors> 
     <behavior> 
      <!--<serviceMetadata httpGetEnabled="true"/>--> 
      <serviceDebug includeExceptionDetailInFaults="true"/> 
      <serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="1000" maxConcurrentInstances="1000" /> 
     </behavior> 
     </serviceBehaviors> 
    </behaviors> 

    <bindings> 
     <netTcpBinding> 
     <binding name="tcpConfig" listenBacklog="100" maxConnections="1000"> 
      <security mode="None"/> 
      <reliableSession enabled="false" /> 
     </binding> 
     </netTcpBinding> 
    </bindings> 
    </system.serviceModel> 
</configuration> 

更新: 我只是試圖用netNamedPipeBinding上述溶液:

<netNamedPipeBinding > 
    <binding name="pipeConfig" maxConnections="1000" > 
     <security mode="None"/> 
    </binding> 
    </netNamedPipeBinding> 

它實際上得到3秒慢(從20至23秒)。由於這個特定的例子都是進程間的,我不知道爲什麼。如果有人有一些見解,請評論。

2

我會說這取決於你的目標。如果你想盡可能地推動你的硬件,那麼很容易獲得10,000+個連接的客戶端,祕密是儘量減少在垃圾收集器中花費的時間並且有效地使用套接字。

我在F#這裏接幾個帖子:http://moiraesoftware.com

Im做一些正在進行的工作有一個叫骨折-IO這裏庫:https://github.com/fractureio/fracture

您可能要檢查這些出來的想法.. 。