我遇到了ZeroMQ的問題,我相信這是因爲我對它不是很熟悉。ZeroMQ性能問題
我試圖建立一個非常簡單的服務,其中多個客戶端連接到服務器併發送查詢。服務器響應此查詢。當我使用REQ-REP套接字組合(客戶端使用REQ,服務器綁定到REP套接字)時,我能夠在服務器端每秒接近60,000條消息(當客戶端和服務器位於同一臺計算機上時) 。當跨計算機分佈時,不同機器上的每個客戶端新實例都會線性增加服務器每秒的消息數,並可以通過足夠的客戶端實例輕鬆達到40,000+。
現在REP插座是阻塞的,所以我隨後ZeroMQ導向和所使用的rrbroker圖案(http://zguide.zeromq.org/cs:rrbroker):
REQ (client) <----> [server ROUTER -- DEALER --- REP (workers running on different threads)]
然而,這完全螺絲了性能。在跨機器運行時,服務器每秒只能收到大約4000條消息。不僅如此,每個在不同機器上啓動的新客戶端都會降低其他客戶端的吞吐量。
我很確定我在做一些愚蠢的事情。我想知道這裏的ZeroMQ專家是否可以指出任何明顯的錯誤。謝謝!
編輯:根據建議添加代碼。我正在使用clrzmq nuget包(https://www.nuget.org/packages/clrzmq-x64/)
這是客戶端代碼。計時器計算每秒接收到的響應數量。
for (int i = 0; i < numTasks; i++) { Task.Factory.StartNew(() => Client(), TaskCreationOptions.LongRunning); }
void Client()
{
using (var ctx = new Context())
{
Socket socket = ctx.Socket(SocketType.REQ);
socket.Connect("tcp://192.168.1.10:1234");
while (true)
{
socket.Send("ping", Encoding.Unicode);
string res = socket.Recv(Encoding.Unicode);
}
}
}
服務器 - 案例1:服務器跟蹤多少請求每秒
using (var zmqContext = new Context())
{
Socket socket = zmqContext.Socket(SocketType.REP);
socket.Bind("tcp://*:1234");
while (true)
{
string q = socket.Recv(Encoding.Unicode);
if (q.CompareTo("ping") == 0) {
socket.Send("pong", Encoding.Unicode);
}
}
}
收到使用此設置,在服務器端,我可以看到每秒接收大約60,000請求(當客戶端在同一臺機器上時)。在不同的計算機上時,每個新客戶端都會按預期增加服務器收到的請求數。
服務器案例2:這實質上是來自ZMQ指南的rrbroker。
void ReceiveMessages(Context zmqContext, string zmqConnectionString, int numWorkers)
{
List<PollItem> pollItemsList = new List<PollItem>();
routerSocket = zmqContext.Socket(SocketType.ROUTER);
try
{
routerSocket.Bind(zmqConnectionString);
PollItem pollItem = routerSocket.CreatePollItem(IOMultiPlex.POLLIN);
pollItem.PollInHandler += RouterSocket_PollInHandler;
pollItemsList.Add(pollItem);
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("{0}", ze.Message);
return;
}
dealerSocket = zmqContext.Socket(SocketType.DEALER);
try
{
dealerSocket.Bind("inproc://workers");
PollItem pollItem = dealerSocket.CreatePollItem(IOMultiPlex.POLLIN);
pollItem.PollInHandler += DealerSocket_PollInHandler;
pollItemsList.Add(pollItem);
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("{0}", ze.Message);
return;
}
// Start the worker pool; cant connect
// to inproc socket before binding.
workerPool.Start(numWorkers);
while (true)
{
zmqContext.Poll(pollItemsList.ToArray());
}
}
void RouterSocket_PollInHandler(Socket socket, IOMultiPlex revents)
{
RelayMessage(routerSocket, dealerSocket);
}
void DealerSocket_PollInHandler(Socket socket, IOMultiPlex revents)
{
RelayMessage(dealerSocket, routerSocket);
}
void RelayMessage(Socket source, Socket destination)
{
bool hasMore = true;
while (hasMore)
{
byte[] message = source.Recv();
hasMore = source.RcvMore;
destination.Send(message, message.Length, hasMore ? SendRecvOpt.SNDMORE : SendRecvOpt.NONE);
}
}
當工人池的啓動方法是:
public void Start(int numWorkerTasks=8)
{
for (int i = 0; i < numWorkerTasks; i++)
{
QueryWorker worker = new QueryWorker(this.zmqContext);
Task task = Task.Factory.StartNew(() =>
worker.Start(),
TaskCreationOptions.LongRunning);
}
Console.WriteLine("Started {0} with {1} workers.", this.GetType().Name, numWorkerTasks);
}
public class QueryWorker
{
Context zmqContext;
public QueryWorker(Context zmqContext)
{
this.zmqContext = zmqContext;
}
public void Start()
{
Socket socket = this.zmqContext.Socket(SocketType.REP);
try
{
socket.Connect("inproc://workers");
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("Could not create worker, error: {0}", ze.Message);
return;
}
while (true)
{
try
{
string message = socket.Recv(Encoding.Unicode);
if (message.CompareTo("ping") == 0)
{
socket.Send("pong", Encoding.Unicode);
}
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("Could not receive message, error: " + ze.ToString());
}
}
}
}
彼得,謝謝你的迴應。我已經添加了代碼。 – Andy