我有以下設置: 有一個客戶端,多個工作人員和一個接收器。 工作人員通過ZeroMQ消息接收來自客戶的工作請求。他們處理輸入,並將答案發送給另一個進程(接收器)。處理一條消息需要大約1ms,我們需要處理大約50,000條消息/秒 - 這意味着我們需要超過50名工人來處理負載。ZeroMQ工作分配
我嘗試了一個簡單的設置,其中客戶端創建一個單一的ZeroMQ PUSH套接字,所有工作人員都通過它連接(通過一個PULL)套接字。同樣,接收器創建一個單獨的PULL套接字,所有工作人員通過PUSH套接字連接到該套接字。
IIUC,ZeroMQ使用「循環法」將消息發送給工人 - 每次另一名工人獲得工作時。這個設置似乎可以有效地工作~10名工人(和適當的負載)。但是,當進一步增加工人和負載的數量時,這種情況很快就會中斷,系統開始積累延遲。
我知道有幾種模式可以解決負載均衡問題,但是它們面向多個客戶端,需要在兩者之間使用路由器,這意味着額外的代碼+ cpu週期。問題是:
1)在單個客戶端,多個工作人員,單個接收器的情況下,最佳模式是什麼?
2)是否有可能在客戶端和工作人員之間沒有路由器的情況下,通過在客戶端進行路由來完成此操作?
3)應該使用什麼樣的ZeroMQ套接字?
謝謝!
編輯: 添加代碼。
客戶:
void *context = zmq_ctx_new();
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Press Enter when the workers are ready: ");
getchar();
printf ("Sending tasks to workers\n");
// The first message is "0" and signals start of batch
s_send (sink, "0");
unsigned long i;
const int nmsgs = atoi(argv[1]);
const int nmsgs_sec = atoi(argv[2]);
const int buff_size = 1024; // 1KB msgs
unsigned long t, t_start;
t_start = timestamp();
for (i = 0; i < nmsgs; i++) {
t = timestamp();
// Pace the sending according to nmsgs_sec
while(i * 1000000/(t+1-t_start) > nmsgs_sec) {
// busy wait
t = timestamp();
}
char buffer [buff_size];
// Write current timestamp in the packet beginning
sprintf (buffer, "%lu", t);
zmq_send (sender, buffer, buff_size, 0);
}
printf("Total time: %lu ms Planned time: %d ms\n", (timestamp() - t_start)/1000, nmsgs * 1000/nmsgs_sec);
zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
工人:
// Socket to receive messages on
void *context = zmq_ctx_new();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, receiver_addr);
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, sender_addr);
// Process tasks forever
const int buff_size = 1024;
char buffer[buff_size];
while (1) {
zmq_recv (receiver, buffer, buff_size, 0);
s_send (sender, buffer);
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
水槽:
// Prepare our context and socket
void *context = zmq_ctx_new();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Wait for start of batch
char *string = s_recv (receiver);
free (string);
unsigned long t1;
unsigned long maxdt = 0;
unsigned long sumdt = 0;
int task_nbr;
int nmsgs = atoi(argv[1]);
printf("nmsgs = %d\n", nmsgs);
for (task_nbr = 0; task_nbr < nmsgs; task_nbr++) {
char *string = s_recv (receiver);
t1 = timestamp();
unsigned long t0 = atoll(string);
free (string);
unsigned long dt = t1-t0;
maxdt = (maxdt > dt ? maxdt : dt);
sumdt += dt;
if(task_nbr % 10000 == 0) {
printf("%d %lu\n", task_nbr, dt);
}
}
printf("Average time: %lu usec\tMax time: %lu usec\n", sumdt/nmsgs, maxdt);
zmq_close (receiver);
zmq_ctx_destroy (context);
嗯,我的問題實際上是什麼是正確的模式(假設有一個,我可能不是第一個有這個問題)負載平衡的客戶端。 目前的實施方案沒有缺陷,但客戶端 - >工作人員或工人 - >水槽方面存在一些低效率 - 實際上很難驗證。如果有人知道問題是什麼,那也可以算作答案。 – SashaM
正如我所說的,你的模式很好(在[ZMQ指南](http://zguide.zeromq.org/page:all)中有多種模式可供選擇),你的是[this one](http:// zguide.zeromq.org/page:all#Divide-and-Conquer)。[Another](http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM))。沒有人可以猜測問題出在哪裏,而沒有一些代碼可以查看,以及一些你實際遇到的錯誤的描述。 – Jason
好,所以我已經完全實現了你引用的分治模式,就像你在我提供的代碼中看到的一樣。問題在於,與少量工人能夠獲得的亞毫秒級的延遲相比,當增加工人數量時,延遲也在增加(對於50名工人來說,大約爲15毫秒)。我的應用程序需要極低的延遲,所以我想知道還能做些什麼。 – SashaM