2014-09-21 46 views
0

我想將服務器上的消息(ZMQ_ROUTER套接字,處理多個客戶端)傳輸到redis服務器以用於存儲目的。我聽說,redis不會說ZMQ。所以如果不搭橋,就不可能實現。我接受你的建議。在哪裏看?將zeromq套接字連接到redis服務器以進行數據傳輸?

//負載平衡的多線程版服務器:

#include "zhelpers.hpp" 
#include <queue> 
#include "zmq.hpp" 
#include <stdio.h> 
#include <string> 
#include <vector> 
#include "datamsg.pb.h" 
using namespace google::protobuf::io; 
    bool verify(std::string str, std::vector<std::string> &s) 
    { 
    for(int q=0;q<s.size();q++) 
    {  
    if(s.at(q)==str.substr(0,4)){ 
    s.push_back(str.substr(4,str.length()-1)); 

    return true; 
    } 
} 
return false; 
} 
// Basic request-reply client using REQ socket 
static void * worker_thread(void *arg) { 
    zmq::context_t context(1); 
    zmq::message_t worker_receive; 
    datamsg worker_parsed; 
    zmq::socket_t worker(context, ZMQ_REQ); 
    s_set_id(worker); // Makes tracing easier 
    worker.connect("ipc://backend.ipc"); 
    // Tell backend we're ready for work 
    s_send(worker, "READY"); 

    while (1) { 
    // Read and save all frames until we get an empty frame 
    worker.recv(&worker_receive); 
    worker_parsed.ParseFromArray(worker_receive.data(), worker_receive.size()); 
    // printing after parsing......... 
    s_sendmore (worker, worker_parsed.destination()); 
    s_sendmore (worker, ""); 
    worker.send(worker_receive);// Here I sent the same structure back 
} 
return (NULL); 
} 

int main (int argc, char *argv[]) { 

// Prepare our context and sockets 
zmq::context_t context(1); 
zmq::socket_t frontend (context, ZMQ_ROUTER); 
zmq::socket_t backend (context, ZMQ_ROUTER); 
zmq::socket_t verification (context, ZMQ_REP); 
verification.bind("tcp://*:5557"); 
std::vector<std::string> s; 
s.reserve(10); 
s.push_back("cli4"); 
frontend.bind("tcp://*:5559"); 
backend.bind("ipc://backend.ipc"); 
zmq::message_t frontend_received; 
zmq::message_t front_get; 
int worker_nbr; 
for(worker_nbr = 0; worker_nbr < 3; worker_nbr++) { 
pthread_t worker; 
pthread_create(&worker, NULL, worker_thread, NULL); 
} 
std::queue<std::string> worker_queue; 

while (1) { 
    // Initialize poll set 
    zmq::pollitem_t items[] = { 
    // Always poll for worker activity on backend 
    { backend, 0, ZMQ_POLLIN, 0 }, 
    // Poll front-end only if we have available workers 
    { frontend, 0, ZMQ_POLLIN, 0 }, 
    //Poll for new customer for verification of client refrence ID 
    {verification,0,ZMQ_POLLIN,0 } 
}; zmq::poll (items, 3, -1); 

if (items [0].revents & ZMQ_POLLIN) { // Handle worker activity on backend 
// Queue worker address for LoadBalanced routing 
    worker_queue.push(s_recv (backend)); 

// Second frame is empty 
std::string empty = s_recv (backend); 
assert (empty.size() == 0); 

// Third frame is READY or else a client reply address 
std::string client_addr = s_recv (backend); 
// If client reply, send rest back to frontend 
if(client_addr.compare("READY") != 0) { 
    std::string empty = s_recv (backend); 
    assert (empty.size() == 0); 
    backend.recv(&frontend_received); 
    s_sendmore (frontend, client_addr); 
    s_sendmore (frontend, ""); 
    frontend.send(frontend_received); 
    //frontend.close(); 
    } 
} 
if (items [1].revents & ZMQ_POLLIN) { 
// Client request is [address][request] 

std::string client_addr = s_recv (frontend); 

frontend.recv(&front_get); 
std::string worker_addr = worker_queue.front(); 
worker_queue.pop(); 

    s_sendmore (backend, worker_addr); 
    s_sendmore (backend, ""); 
    backend.send(front_get); 

} 
if (items [2].revents & ZMQ_POLLIN) { 
std::string refrence=s_recv(verification); 
    if(verify(refrence,s)){ 
    s_send(verification,"OK"); 
    std::cout<<"ID:"<<refrence.substr(4,(refrence.length()-1))<<" Has been Registered" <<std::endl; 
} 
else s_send(verification,"Verification Failed!"); 
    } 

} 
sleep (1); 
return 0; 
} 
+0

Apache Camel是一個在不同協議之間構建總線的好項目。 – zenbeni 2014-09-22 08:29:58

+0

爲什麼您的應用程序需要使用ZMQ,而不是使用redis模塊/庫/等直接與redis對話?如果redis服務器位於另一臺主機上,您需要通過某種方式首先將數據發送給該主機,那麼通常您會爲該主機編寫一個客戶機應用程序,然後該應用程序會原生地與Redis進行通信。 – Jason 2014-09-22 13:45:17

+0

@Jason我需要通過redis服務器記錄所有的消息。 zmq和redis都本地運行(localhost)但是我找不到從zmq套接字與redis進行通信的方式。我是否需要編寫一個單獨的客戶端? – monsterrrrr 2014-09-22 18:53:14

回答

2

所以,你必須運行一個ZMQ ROUTER插座的應用程序,並且要存檔這些消息的Redis?除非你有一些你沒有提到的限制,否則你應該直接從你的應用程序連接到redis,而不是試圖通過ZMQ傳遞所有的通信。 ZMQ插槽只有有史以來與其他ZMQ套接字(沒有或多或少逆向工程ZMQ協議,但這相當於建立一個橋樑,你說你不想要)。

Redis沒有本機ZMQ連接選項。

或多或少你需要完成看起來像這樣的內容:

-------Application-------  ------------ 
|      |  | External | 
|   ZMQ socket-(|<----(| Source | 
|   v   |  ------------ 
|   V   | 
|  (Process Data) |  ---------- 
|   V   |  | Redis | 
|  Redis connector--|)---->| Server | 
|      |  ---------- 
------------------------- 

希望這是有道理的。如果您在原始問題中提供代碼,那麼我們可以直接解決該問題。

+0

+1爲殺手ascii藝術! – 2014-09-23 14:15:07

+0

@Jason其實我的應用程序完全基於zeromq。它是一個擁有多個套接字的聊天服務器。客戶端連接到它即時通訊。我想將郵件從zmq套接字傳輸到redis。所以繞過zeromq不是一個選項(至少我不知道)。 Redis連接器的含義是什麼?這是什麼類型的連接器?種類的插座?如果你想查看,我已經粘貼了所有的代碼,請高興地做,並且非常感謝你的幫助,通過可視化:) +1 – monsterrrrr 2014-09-23 17:47:40

+0

你的應用程序是用某種語言編寫的(我猜Python,但你沒有指定)。該語言具有綁定,允許您創建ZMQ套接字並將其連接到遠程ZMQ套接字。該語言還具有*獨立*綁定,可讓您連接併發送數據到redis。 – Jason 2014-09-23 17:53:37

相關問題