2013-10-24 119 views
3

我遇到了ZeroMQ的問題,我相信這是因爲我對它不是很熟悉。ZeroMQ性能問題

我試圖建立一個非常簡單的服務,其中多個客戶端連接到服務器併發送查詢。服務器響應此查詢。當我使用REQ-REP套接字組合(客戶端使用REQ,服務器綁定到REP套接字)時,我能夠在服務器端每秒接近60,000條消息(當客戶端和服務器位於同一臺計算機上時) 。當跨計算機分佈時,不同機器上的每個客戶端新實例都會線性增加服務器每秒的消息數,並可以通過足夠的客戶端實例輕鬆達到40,000+。

現在REP插座是阻塞的,所以我隨後ZeroMQ導向和所使用的rrbroker圖案(http://zguide.zeromq.org/cs:rrbroker):

REQ (client) <----> [server ROUTER -- DEALER --- REP (workers running on different threads)] 

然而,這完全螺絲了性能。在跨機器運行時,服務器每秒只能收到大約4000條消息。不僅如此,每個在不同機器上啓動的新客戶端都會降低其他客戶端的吞吐量。

我很確定我在做一些愚蠢的事情。我想知道這裏的ZeroMQ專家是否可以指出任何明顯的錯誤。謝謝!

編輯:根據建議添加代碼。我正在使用clrzmq nuget包(https://www.nuget.org/packages/clrzmq-x64/

這是客戶端代碼。計時器計算每秒接收到的響應數量。

for (int i = 0; i < numTasks; i++) { Task.Factory.StartNew(() => Client(), TaskCreationOptions.LongRunning); } 

void Client() 
    { 
     using (var ctx = new Context()) 
     { 
      Socket socket = ctx.Socket(SocketType.REQ); 
      socket.Connect("tcp://192.168.1.10:1234"); 
      while (true) 
      { 
       socket.Send("ping", Encoding.Unicode); 
       string res = socket.Recv(Encoding.Unicode); 
      } 
     } 
    } 

服務器 - 案例1:服務器跟蹤多少請求每秒

using (var zmqContext = new Context()) 
{ 
    Socket socket = zmqContext.Socket(SocketType.REP); 
    socket.Bind("tcp://*:1234"); 
    while (true) 
    { 
     string q = socket.Recv(Encoding.Unicode); 
     if (q.CompareTo("ping") == 0) { 
      socket.Send("pong", Encoding.Unicode); 
     } 
    } 
}  

收到使用此設置,在服務器端,我可以看到每秒接收大約60,000請求(當客戶端在同一臺機器上時)。在不同的計算機上時,每個新客戶端都會按預期增加服務器收到的請求數。

服務器案例2:這實質上是來自ZMQ指南的rrbroker。

void ReceiveMessages(Context zmqContext, string zmqConnectionString, int numWorkers) 
    { 
     List<PollItem> pollItemsList = new List<PollItem>(); 

     routerSocket = zmqContext.Socket(SocketType.ROUTER); 
     try 
     { 
      routerSocket.Bind(zmqConnectionString); 
      PollItem pollItem = routerSocket.CreatePollItem(IOMultiPlex.POLLIN); 
      pollItem.PollInHandler += RouterSocket_PollInHandler; 
      pollItemsList.Add(pollItem); 
     } 
     catch (ZMQ.Exception ze) 
     { 
      Console.WriteLine("{0}", ze.Message); 
      return; 
     } 

     dealerSocket = zmqContext.Socket(SocketType.DEALER); 
     try 
     { 
      dealerSocket.Bind("inproc://workers"); 
      PollItem pollItem = dealerSocket.CreatePollItem(IOMultiPlex.POLLIN); 
      pollItem.PollInHandler += DealerSocket_PollInHandler; 
      pollItemsList.Add(pollItem); 
     } 
     catch (ZMQ.Exception ze) 
     { 
      Console.WriteLine("{0}", ze.Message); 
      return; 
     } 

     // Start the worker pool; cant connect 
     // to inproc socket before binding. 
     workerPool.Start(numWorkers); 

     while (true) 
     { 
      zmqContext.Poll(pollItemsList.ToArray()); 
     } 
    } 

    void RouterSocket_PollInHandler(Socket socket, IOMultiPlex revents) 
    { 
     RelayMessage(routerSocket, dealerSocket); 
    } 

    void DealerSocket_PollInHandler(Socket socket, IOMultiPlex revents) 
    { 
     RelayMessage(dealerSocket, routerSocket); 
    } 

    void RelayMessage(Socket source, Socket destination) 
    { 
     bool hasMore = true; 
     while (hasMore) 
     { 
      byte[] message = source.Recv(); 
      hasMore = source.RcvMore; 
      destination.Send(message, message.Length, hasMore ? SendRecvOpt.SNDMORE : SendRecvOpt.NONE); 
     } 
    }  

當工人池的啓動方法是:

public void Start(int numWorkerTasks=8) 
    { 
     for (int i = 0; i < numWorkerTasks; i++) 
     { 
      QueryWorker worker = new QueryWorker(this.zmqContext); 
      Task task = Task.Factory.StartNew(() => 
      worker.Start(), 
      TaskCreationOptions.LongRunning); 
     } 
     Console.WriteLine("Started {0} with {1} workers.", this.GetType().Name, numWorkerTasks); 
    } 

public class QueryWorker 
{ 
    Context zmqContext; 

    public QueryWorker(Context zmqContext) 
    { 
     this.zmqContext = zmqContext; 
    } 

    public void Start() 
    { 
     Socket socket = this.zmqContext.Socket(SocketType.REP); 
     try 
     { 
      socket.Connect("inproc://workers"); 
     } 
     catch (ZMQ.Exception ze) 
     { 
      Console.WriteLine("Could not create worker, error: {0}", ze.Message); 
      return; 
     } 

     while (true) 
     { 
      try 
      { 
       string message = socket.Recv(Encoding.Unicode); 
       if (message.CompareTo("ping") == 0) 
       { 
        socket.Send("pong", Encoding.Unicode); 
       } 
      } 
      catch (ZMQ.Exception ze) 
      { 
       Console.WriteLine("Could not receive message, error: " + ze.ToString()); 
      } 
     } 
    } 
} 

回答

1

你可以張貼一些源代碼,或者至少你的測試用例的更詳細的解釋?一般來說,構建設計的方法是每次進行一次更改,並在每次更改時進行測量。您始終可以從已知的工作設計逐步移動到更復雜的設計。

+0

彼得,謝謝你的迴應。我已經添加了代碼。 – Andy

1

很可能是'ROUTER'是瓶頸。

檢查出這一點,這些相關的問題:

  1. Client maintenance in ZMQ ROUTER
  2. Load testing ZeroMQ (ZMQ_STREAM) for finding the maximum simultaneous users it can handle

路由器(和ZMQ_STREAM,這僅僅是一個路由器的變體)內部必須保持客戶端映射,因此IMO可以接受來自特定客戶的有限連接。看起來ROUTER可以複用多個客戶端,只要每個客戶端只有一個活動連接。

我可能在這裏錯了 - 但我沒有看到相反的證據(簡單的工作代碼擴展到與ROUTER或STREAM多連接多客戶端)。

對於使用ZeroMQ的併發連接,當然有一個非常嚴格的限制,儘管看起來沒有人知道是什麼原因造成的。

1

我已經做了做性能測試上調用與C#的各種方法本地非託管的DLL函數: 1. C++/CLI包裝器 2. PInvoke的 3. ZeroMQ/clrzmq

最後可能是有趣您。

在我的性能測試結束時,我發現使用ZMQ綁定clrzmq沒有用處,並且在我嘗試優化綁定源代碼中的PInvoke調用後,性能開銷達到了100倍。因此,我使用了ZMQ而沒有綁定,但使用了PInvoke調用。這些調用必須使用cdecl約定並使用「SuppressUnmanagedCodeSecurity」選項來獲得最高速度。 我不得不導入5個相當簡單的函數。 最後,速度比PInvoke調用慢了一點,但ZMQ在我的情況下超過了「inproc」。

如果你感興趣的速度,這可能會給你暗示嘗試沒有綁定。

這不是您的問題的直接答案,但可能會幫助您提高總體表現。