2015-06-15 102 views
2

我有以下設置: 有一個客戶端,多個工作人員和一個接收器。 工作人員通過ZeroMQ消息接收來自客戶的工作請求。他們處理輸入,並將答案發送給另一個進程(接收器)。處理一條消息需要大約1ms,我們需要處理大約50,000條消息/秒 - 這意味着我們需要超過50名工人來處理負載。ZeroMQ工作分配

我嘗試了一個簡單的設置,其中客戶端創建一個單一的ZeroMQ PUSH套接字,所有工作人員都通過它連接(通過一個PULL)套接字。同樣,接收器創建一個單獨的PULL套接字,所有工作人員通過PUSH套接字連接到該套接字。

IIUC,ZeroMQ使用「循環法」將消息發送給工人 - 每次另一名工人獲得工作時。這個設置似乎可以有效地工作~10名工人(和適當的負載)。但是,當進一步增加工人和負載的數量時,這種情況很快就會中斷,系統開始積累延遲。

我知道有幾種模式可以解決負載均衡問題,但是它們面向多個客戶端,需要在兩者之間使用路由器,這意味着額外的代碼+ cpu週期。問題是:

1)在單個客戶端,多個工作人員,單個接收器的情況下,最佳模式是什麼?

2)是否有可能在客戶端和工作人員之間沒有路由器的情況下,通過在客戶端進行路由來完成此操作?

3)應該使用什麼樣的ZeroMQ套接字?

謝謝!

Diagram

編輯: 添加代碼。

客戶:

void *context = zmq_ctx_new(); 

    // Socket to send messages on 
    void *sender = zmq_socket (context, ZMQ_PUSH); 
    zmq_bind (sender, "tcp://*:5557"); 

    // Socket to send start of batch message on 
    void *sink = zmq_socket (context, ZMQ_PUSH); 
    zmq_connect (sink, "tcp://localhost:5558"); 

    printf ("Press Enter when the workers are ready: "); 
    getchar(); 
    printf ("Sending tasks to workers\n"); 

    // The first message is "0" and signals start of batch 
    s_send (sink, "0"); 

    unsigned long i; 
    const int nmsgs = atoi(argv[1]); 
    const int nmsgs_sec = atoi(argv[2]); 
    const int buff_size = 1024; // 1KB msgs 
    unsigned long t, t_start; 
    t_start = timestamp(); 
    for (i = 0; i < nmsgs; i++) { 
      t = timestamp(); 
      // Pace the sending according to nmsgs_sec 
      while(i * 1000000/(t+1-t_start) > nmsgs_sec) { 
        // busy wait 
        t = timestamp(); 
      } 
      char buffer [buff_size]; 
      // Write current timestamp in the packet beginning 
      sprintf (buffer, "%lu", t); 
      zmq_send (sender, buffer, buff_size, 0); 
    } 
    printf("Total time: %lu ms Planned time: %d ms\n", (timestamp() - t_start)/1000, nmsgs * 1000/nmsgs_sec); 

    zmq_close (sink); 
    zmq_close (sender); 
    zmq_ctx_destroy (context); 

工人:

// Socket to receive messages on 
void *context = zmq_ctx_new(); 
void *receiver = zmq_socket (context, ZMQ_PULL); 
zmq_connect (receiver, receiver_addr); 

// Socket to send messages to 
void *sender = zmq_socket (context, ZMQ_PUSH); 
zmq_connect (sender, sender_addr); 

// Process tasks forever 
const int buff_size = 1024; 
char buffer[buff_size]; 
while (1) { 
    zmq_recv (receiver, buffer, buff_size, 0); 
    s_send (sender, buffer); 
} 
zmq_close (receiver); 
zmq_close (sender); 
zmq_ctx_destroy (context); 

水槽:

// Prepare our context and socket 
void *context = zmq_ctx_new(); 
void *receiver = zmq_socket (context, ZMQ_PULL); 
zmq_bind (receiver, "tcp://*:5558"); 

// Wait for start of batch 
char *string = s_recv (receiver); 
free (string); 

unsigned long t1; 
unsigned long maxdt = 0; 
unsigned long sumdt = 0; 

int task_nbr; 
int nmsgs = atoi(argv[1]); 
printf("nmsgs = %d\n", nmsgs); 
for (task_nbr = 0; task_nbr < nmsgs; task_nbr++) { 
    char *string = s_recv (receiver); 
    t1 = timestamp(); 
    unsigned long t0 = atoll(string); 
    free (string); 

    unsigned long dt = t1-t0; 
    maxdt = (maxdt > dt ? maxdt : dt); 
    sumdt += dt; 

    if(task_nbr % 10000 == 0) { 
      printf("%d %lu\n", task_nbr, dt); 
    } 
} 

printf("Average time: %lu usec\tMax time: %lu usec\n", sumdt/nmsgs, maxdt); 

zmq_close (receiver); 
zmq_ctx_destroy (context); 

回答

0

你有多種選擇,這取決於其中實際誤差在當前設置此起彼伏(根據您提供的信息無法分辨)。

你絕對不需要另一個「中間」節點。

如果問題是連接數量(1-> 50)是您當前設置中的問題,您可以在客戶端上設置多個PUSH套接字,每個套接字都有一部分工作負載,內部在客戶端。

如果問題在於PUSH插座本身,您可以改爲使用「推」側的DEALER插座和「拉」側的ROUTER插座。但我不認爲這是問題。

一般而言,我希望您的當前設置是「正確的」,並且您的實現中可能存在一個錯誤。你知道錯誤引入的地方嗎?客戶端 - >工人或工人 - >接收器?或者也許在其他地方?

+0

嗯,我的問題實際上是什麼是正確的模式(假設有一個,我可能不是第一個有這個問題)負載平衡的客戶端。 目前的實施方案沒有缺陷,但客戶端 - >工作人員或工人 - >水槽方面存在一些低效率 - 實際上很難驗證。如果有人知道問題是什麼,那也可以算作答案。 – SashaM

+0

正如我所說的,你的模式很好(在[ZMQ指南](http://zguide.zeromq.org/page:all)中有多種模式可供選擇),你的是[this one](http:// zguide.zeromq.org/page:all#Divide-and-Conquer)。[Another](http://zguide.zeromq.org/page:all#Handling-Errors-and-ETERM))。沒有人可以猜測問題出在哪裏,而沒有一些代碼可以查看,以及一些你實際遇到的錯誤的描述。 – Jason

+0

好,所以我已經完全實現了你引用的分治模式,就像你在我提供的代碼中看到的一樣。問題在於,與少量工人能夠獲得的亞毫秒級的延遲相比,當增加工人數量時,延遲也在增加(對於50名工人來說,大約爲15毫秒)。我的應用程序需要極低的延遲,所以我想知道還能做些什麼。 – SashaM