0
我在實時快速服務器和慢速客戶端上安裝了zeromq-4.1.4庫和cppzmq。ZMQ_CONFLATE不適用於ZMQ_SUB(無過濾器)
客戶端和服務器都有2個用於發佈和訂閱的端口,通過TCP-IP進行通信。
服務器以它自己的快速率發送消息。客戶端收到最新的消息,執行一些慢計算並將消息發送回服務器。服務器讀取消息,如果有傳入並處理它。
問題是舊消息沒有被新的覆蓋。客戶端總是打印出較舊的消息,即使我關閉了服務器,消息仍會從客戶端的接收緩衝區排隊。
爲什麼發生? ZMQ_CONFLATE已設置。它不應該工作嗎?
作爲一種解決方法,我儘管把客戶端放在工作線程中以最大速率工作,然後手動保留最後一條消息。但這是一個開銷,因爲這正是zeromq在發送或接收消息時的行爲,據我瞭解。
客戶機/服務器的代碼是相同的:
void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags)
{
flags_ = flags;
int confl = 1;
// Prepare our context
context_ = new zmq::context_t(1);
// Prepare ZMQ publisher
publisher_ = new zmq::socket_t(*context_, ZMQ_PUB);
publisher_->bind(pubAddress);
publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
// Prepare ZMQ subscriber
subscriber_ = new zmq::socket_t(*this->context_, ZMQ_SUB);
subscriber_->connect(subAddress);
subscriber_->setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
if (flags_ & ZMQ_SYNC_PUB)
{
syncService_ = new zmq::socket_t(*context_, ZMQ_REP);
syncService_->bind(syncAddress);
}
if (flags_ & ZMQ_SYNC_SUB)
{
// synchronize with publisher
syncService_ = new zmq::socket_t(*context_, ZMQ_REQ);
syncService_->connect(syncAddress);
// - send a synchronization request
zmq::message_t message(0);
syncService_->send(message);
// - wait for synchronization reply
zmq::message_t update;
syncService_->recv(&update);
}
}
void ZeromqMessenger::sync()
{
if (connected_)
return;
if (flags_ & ZMQ_SYNC_PUB)
{
//std::cout << "Waiting for subscribers" << std::endl;
if (subscribers_ < subscribers_expected_)
{
// - wait for synchronization request
zmq::message_t update;
if (syncService_->recv(&update, ZMQ_DONTWAIT))
{
// - send synchronization reply
zmq::message_t message(0);
syncService_->send(message);
subscribers_++;
}
}
if (subscribers_ == subscribers_expected_)
connected_ = true;
}
}
void ZeromqMessenger::send(const void* data, int size) const
{
zmq::message_t message(size);
memcpy(message.data(), data, size);
publisher_->send(message);
}
bool ZeromqMessenger::recv(void *data, int size, int flags) const
{
zmq::message_t update;
bool received = subscriber_->recv(&update, flags);
if(received)
memcpy(data, update.data(), size);
return received;
}