2016-06-28 258 views
0

我在實時快速服務器和慢速客戶端上安裝了zeromq-4.1.4庫和cppzmq。ZMQ_CONFLATE不適用於ZMQ_SUB(無過濾器)

客戶端和服務器都有2個用於發佈和訂閱的端口,通過TCP-IP進行通信。

服務器以它自己的快速率發送消息。客戶端收到最新的消息,執行一些慢計算並將消息發送回服務器。服務器讀取消息,如果有傳入並處理它。

問題是舊消息沒有被新的覆蓋。客戶端總是打印出較舊的消息,即使我關閉了服務器,消息仍會從客戶端的接收緩衝區排隊。

爲什麼發生? ZMQ_CONFLATE已設置。它不應該工作嗎?

作爲一種解決方法,我儘管把客戶端放在工作線程中以最大速率工作,然後手動保留最後一條消息。但這是一個開銷,因爲這正是zeromq在發送或接收消息時的行爲,據我瞭解。

客戶機/服務器的代碼是相同的:

void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags) 
{ 
    flags_ = flags; 
    int confl = 1; 

    // Prepare our context 
    context_ = new zmq::context_t(1); 

    // Prepare ZMQ publisher 
    publisher_ = new zmq::socket_t(*context_, ZMQ_PUB); 
    publisher_->bind(pubAddress); 
    publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message 

    // Prepare ZMQ subscriber 
    subscriber_ = new zmq::socket_t(*this->context_, ZMQ_SUB); 
    subscriber_->connect(subAddress); 
    subscriber_->setsockopt(ZMQ_SUBSCRIBE, "", 0); 
    subscriber_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message 

    if (flags_ & ZMQ_SYNC_PUB) 
    { 
    syncService_ = new zmq::socket_t(*context_, ZMQ_REP); 
    syncService_->bind(syncAddress); 
    } 

    if (flags_ & ZMQ_SYNC_SUB) 
    { 
    // synchronize with publisher 
    syncService_ = new zmq::socket_t(*context_, ZMQ_REQ); 
    syncService_->connect(syncAddress); 

    // - send a synchronization request 
    zmq::message_t message(0); 
    syncService_->send(message); 

    // - wait for synchronization reply 
    zmq::message_t update; 
    syncService_->recv(&update); 
    } 
} 

void ZeromqMessenger::sync() 
{ 
    if (connected_) 
    return; 

    if (flags_ & ZMQ_SYNC_PUB) 
    { 
    //std::cout << "Waiting for subscribers" << std::endl; 
    if (subscribers_ < subscribers_expected_) 
    { 
     // - wait for synchronization request 
     zmq::message_t update; 
     if (syncService_->recv(&update, ZMQ_DONTWAIT)) 
     { 
     // - send synchronization reply 
     zmq::message_t message(0); 
     syncService_->send(message); 

     subscribers_++; 
     } 
    } 

    if (subscribers_ == subscribers_expected_) 
     connected_ = true; 
    } 
} 

void ZeromqMessenger::send(const void* data, int size) const 
{ 
    zmq::message_t message(size); 
    memcpy(message.data(), data, size); 
    publisher_->send(message); 
} 

bool ZeromqMessenger::recv(void *data, int size, int flags) const 
{ 
    zmq::message_t update; 
    bool received = subscriber_->recv(&update, flags); 
    if(received) 
    memcpy(data, update.data(), size); 
    return received; 
} 

回答

0

我實現了線程版本,它工作得很好。這是一個帶有全局變量的非常粗糙的實現,應該對其進行細化,但至少它是有效的。

#include <zmq_messenger.h> 
#include <iostream> 
#include <thread> 
#include <mutex> 

std::string gSubAddress; 
std::mutex gMtx; 
const int gSize = 20*sizeof(double); 
char gData[gSize]; 

void *worker_routine (void *context) 
{ 
    // Prepare ZMQ subscriber 
    int confl = 1; 
    zmq::socket_t* subscriber = new zmq::socket_t(*(zmq::context_t*)context, ZMQ_SUB); 
    subscriber->connect(gSubAddress.c_str()); 
    subscriber->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message 
    subscriber->setsockopt(ZMQ_SUBSCRIBE, "", 0); 

    while (1) 
    { 
    zmq::message_t update; 
    bool received = subscriber->recv(&update, ZMQ_DONTWAIT); 
    if(received) 
    { 
     gMtx.lock(); 
     memcpy(gData, update.data(), gSize); 
     gMtx.unlock(); 
    } 
    } 
    zmq_close(subscriber); 
    return NULL; 
} 

void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags) 
{ 
    flags_ = flags; 
    int confl = 1; 

    // Prepare our context 
    context_ = new zmq::context_t(1); 

    // Prepare ZMQ publisher 
    publisher_ = new zmq::socket_t(*context_, ZMQ_PUB); 
    publisher_->bind(pubAddress); 
    publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message 

    gSubAddress = std::string(subAddress); 
    pthread_create (&subscriber_worker_, NULL, worker_routine, context_); 

    if (flags_ & ZMQ_SYNC_PUB) 
    { 
    syncService_ = new zmq::socket_t(*context_, ZMQ_REP); 
    syncService_->bind(syncAddress); 
    } 

    if (flags_ & ZMQ_SYNC_SUB) 
    { 
    //std::cout << "Trying to connect" << std::endl; 

    // synchronize with publisher 
    syncService_ = new zmq::socket_t(*context_, ZMQ_REQ); 
    syncService_->connect(syncAddress); 

    // - send a synchronization request 
    zmq::message_t message(0); 
    syncService_->send(message); 

    // - wait for synchronization reply 
    zmq::message_t update; 
    syncService_->recv(&update); 

    // Third, get our updates and report how many we got 
    //std::cout << "Ready to receive" << std::endl; 
    } 
} 

void ZeromqMessenger::sync() 
{ 
    //std::cout << "sync" << std::endl; 
    if (connected_) 
    return; 

    if (flags_ & ZMQ_SYNC_PUB) 
    { 
    //std::cout << "Waiting for subscribers" << std::endl; 
    if (subscribers_ < subscribers_expected_) 
    { 
     // - wait for synchronization request 
     zmq::message_t update; 
     if (syncService_->recv(&update, ZMQ_DONTWAIT)) 
     { 
     // - send synchronization reply 
     zmq::message_t message(0); 
     syncService_->send(message); 

     subscribers_++; 
     } 
    } 

    if (subscribers_ == subscribers_expected_) 
     connected_ = true; 

    //std::cout << subscribers_ << " subscriber(s) connected" << std::endl; 
    } 
} 

void ZeromqMessenger::send(const void* data, int size) const 
{ 
    zmq::message_t message(size); 
    memcpy(message.data(), data, size); 
    publisher_->send(message); 
} 

bool ZeromqMessenger::recv(void *data, int size, int flags) const 
{ 
    assert(gSize == size); 
    gMtx.lock(); 
    memcpy(data, gData, size); 
    gMtx.unlock(); 
    return true; 
}