2016-07-14 79 views
1

考慮以下兩個過程:ZeroMQ REQ/REP性能

sender.cpp:

#include <zhelpers.h> 
... 
zmq::context_t ctx(1); 
... 
void foo(int i) 
{ 
    zmq::socket_t sender(ctx, ZMQ_REQ); 
    sender.connect("tcp://hostname:5000"); 

    std::stringstream ss; 
    ss <<"bar_" <<i; 
    std::string bar_i(std::move(ss.str()); 

    s_sendmore(sender, "foo "); 
    (i != N) ? s_send(sender, bar, 0) : s_send(sender, "done", 0); 
    s_recv(sender); 
} 

int main() 
{ 
    for(int i=0; i<=100000; ++i) 
     foo(i); 
    return 0; 
} 

receiver.cpp

#include <zhelpers.h> 
... 
int main() 
{ 
    zmq::context_t ctx(1); 
    zmq::socket_t rcv(ctx, ZMQ_REP); 
    rcv.bind("tcp://*:5000"); 

    std::string s1(""); 
    std::string s2(""); 

    while(s2 != "done") 
    { 
     s1 = std::move(s_recv(rcv)); 
     s2 = std::move(s_recv(rcv)); 
     std::cout <<"received: " <<s1 <<" " <<s2 <<"\n"; 
     s_send(rcv, "ACK"); 
    } 

    return 0; 
} 

讓我們先從兩個過程。我會想到的是,接收器進程將接收方發送給它的所有信息,它會打印出:

foo bar_1 
foo bar_2 
... 

等等,直到:

... 
foo bar_100000 

我預計它會做到這一點沒有任何阻礙。

我的問題是,接收器總是圍繞28215th迭代(總是圍繞該數字!!!)並且阻塞直到一分鐘左右。然後進一步到100000,但有時它會再次粘住。我的問題當然是:爲什麼會發生這種情況?我該如何解決它?

我試圖把'發送者'放在全局範圍內的foo(。)中,然後它就起作用了:在這種情況下,所有打印輸出從1到100000順利且超快速地進行,沒有任何阻塞當然在這種情況下,每次調用foo(。)時都不會創建套接字)。但不幸的是,在我的代碼中,我無法做到這一點。

我想了解爲什麼會出現此塊。

+0

最大套接字可能在服務器端受到限制。嘗試增加它可能會解決它。因爲tcp需要花費時間來清理死亡套接字,並且你有很多這種情況會觸發最大數量的套接字。 – somdoron

回答

0

首先,你的例子不是非常可行的,因爲它們不編譯。因此,這裏有一些exapmles,應該是接近你的意圖和實際編譯

sender.cpp

#include <zmq.hpp> 
#include <string> 
#include <iostream> 
#include <string> 

void send(const std::string& msg) 
{ 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REQ); 

    std::cout << "Connecting to receiver ..." << std::endl; 
    socket.connect ("tcp://localhost:5555"); 

    zmq::message_t request (100); 
    memcpy (request.data(), msg.c_str(), 100); 
    std::cout << "Sending message " << msg << "..." << std::endl; 
    socket.send (request); 
} 

int main() 
{ 
    for(int i = 0; i < 100000; ++i) 
    { 
     send(std::to_string(i)); 
    } 
    send("done"); 
} 

使用一些臨客

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include sender.cpp -lzmq -o sender 

receiver.cpp

#include <zmq.hpp> 
#include <string> 
#include <cstring> 
#include <iostream> 

int main() { 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REP); 
    socket.bind ("tcp://*:5555"); 

    char buf[100] = {0}; 
    while (std::string(buf).compare("done")) { 
     zmq::message_t request; 

     // Wait for next request from client 
     socket.recv (&request); 
     std::memcpy(buf, request.data(), 100); 
     std::cout << "Received message " << buf << std::endl; 

     // Send reply back to client 
     zmq::message_t reply (5); 
     memcpy (reply.data(), "Hello", 5); 
     socket.send (reply); 
    } 
    return 0; 
} 

使用

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include receiver.cpp -lzmq -o receiver 

啓動過程的時候,一切似乎都做工精細,不出所料,沒有休息的接收器輸出:

Received message 99996 
Received message 99997 
Received message 99998 
Received message 99999 
Received message done 

但我所期待的:看看netstat的:

netstat 
Active Internet connections (w/o servers) 
Proto Recv-Q Send-Q Local Address   Foreign Address   State  
tcp  0  0 localhost:38345   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46228   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60309   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46916   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:47600   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:54454   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46409   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51142   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40355   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40005   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45614   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48974   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41427   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58740   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58754   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60044   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57478   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:50419   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:44361   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:37284   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:38662   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45968   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57407   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:59200   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41292   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:55243   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51489   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48865   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:35491   localhost:5555   TIME_WAIT 
... 

一次運行後,我有超過20k(!)個這樣的插槽,處於TIME_WAIT狀態。這是因爲中socket的變化範圍void send(...)。我不清楚zmq在超出範圍時銷燬套接字時會做什麼,但我確定它將在套接字的fd上調用close(),這將使您的套接字處於TIME_WAIT狀態。即使我的發送者和接收者運行順利,我也不知道你的系統如何處理這麼多的套接字。另外,我不知道你的zhelpers.h文件在做什麼。但是我知道,如果將套接字置於全局範圍內,則在一個套接字上的發送方只會發生一次close()調用。我會從這裏開始調查更多。也許,看看how-to-forcibly-close-a-socket-in-time-wait ...

+0

謝謝。會檢查。對不起,我的代碼沒有編譯。我只是想向你展示問題本身(這就是爲什麼我使用'...'的原因)。我不想去看每一個細節。 zhelpers.h可以在這裏找到例如:https://github.com/imatix/zguide2/tree/master/examples/C%2B%2B – gybacsi

+1

另一件事是,也許你只是不想聲明你的在全局範圍內發送套接字變量,但如果您只是將範圍擴大一點,就可以用數千個TIME_WAIT聲明的套接字解​​決問題,即將它聲明在發送循環之外並重復使用它至少用於發送100000個消息。 – yussuf