我想將服務器上的消息(ZMQ_ROUTER套接字,處理多個客戶端)傳輸到redis服務器以用於存儲目的。我聽說,redis不會說ZMQ。所以如果不搭橋,就不可能實現。我接受你的建議。在哪裏看?將zeromq套接字連接到redis服務器以進行數據傳輸?
//負載平衡的多線程版服務器:
#include "zhelpers.hpp"
#include <queue>
#include "zmq.hpp"
#include <stdio.h>
#include <string>
#include <vector>
#include "datamsg.pb.h"
using namespace google::protobuf::io;
bool verify(std::string str, std::vector<std::string> &s)
{
for(int q=0;q<s.size();q++)
{
if(s.at(q)==str.substr(0,4)){
s.push_back(str.substr(4,str.length()-1));
return true;
}
}
return false;
}
// Basic request-reply client using REQ socket
static void * worker_thread(void *arg) {
zmq::context_t context(1);
zmq::message_t worker_receive;
datamsg worker_parsed;
zmq::socket_t worker(context, ZMQ_REQ);
s_set_id(worker); // Makes tracing easier
worker.connect("ipc://backend.ipc");
// Tell backend we're ready for work
s_send(worker, "READY");
while (1) {
// Read and save all frames until we get an empty frame
worker.recv(&worker_receive);
worker_parsed.ParseFromArray(worker_receive.data(), worker_receive.size());
// printing after parsing.........
s_sendmore (worker, worker_parsed.destination());
s_sendmore (worker, "");
worker.send(worker_receive);// Here I sent the same structure back
}
return (NULL);
}
int main (int argc, char *argv[]) {
// Prepare our context and sockets
zmq::context_t context(1);
zmq::socket_t frontend (context, ZMQ_ROUTER);
zmq::socket_t backend (context, ZMQ_ROUTER);
zmq::socket_t verification (context, ZMQ_REP);
verification.bind("tcp://*:5557");
std::vector<std::string> s;
s.reserve(10);
s.push_back("cli4");
frontend.bind("tcp://*:5559");
backend.bind("ipc://backend.ipc");
zmq::message_t frontend_received;
zmq::message_t front_get;
int worker_nbr;
for(worker_nbr = 0; worker_nbr < 3; worker_nbr++) {
pthread_t worker;
pthread_create(&worker, NULL, worker_thread, NULL);
}
std::queue<std::string> worker_queue;
while (1) {
// Initialize poll set
zmq::pollitem_t items[] = {
// Always poll for worker activity on backend
{ backend, 0, ZMQ_POLLIN, 0 },
// Poll front-end only if we have available workers
{ frontend, 0, ZMQ_POLLIN, 0 },
//Poll for new customer for verification of client refrence ID
{verification,0,ZMQ_POLLIN,0 }
}; zmq::poll (items, 3, -1);
if (items [0].revents & ZMQ_POLLIN) { // Handle worker activity on backend
// Queue worker address for LoadBalanced routing
worker_queue.push(s_recv (backend));
// Second frame is empty
std::string empty = s_recv (backend);
assert (empty.size() == 0);
// Third frame is READY or else a client reply address
std::string client_addr = s_recv (backend);
// If client reply, send rest back to frontend
if(client_addr.compare("READY") != 0) {
std::string empty = s_recv (backend);
assert (empty.size() == 0);
backend.recv(&frontend_received);
s_sendmore (frontend, client_addr);
s_sendmore (frontend, "");
frontend.send(frontend_received);
//frontend.close();
}
}
if (items [1].revents & ZMQ_POLLIN) {
// Client request is [address][request]
std::string client_addr = s_recv (frontend);
frontend.recv(&front_get);
std::string worker_addr = worker_queue.front();
worker_queue.pop();
s_sendmore (backend, worker_addr);
s_sendmore (backend, "");
backend.send(front_get);
}
if (items [2].revents & ZMQ_POLLIN) {
std::string refrence=s_recv(verification);
if(verify(refrence,s)){
s_send(verification,"OK");
std::cout<<"ID:"<<refrence.substr(4,(refrence.length()-1))<<" Has been Registered" <<std::endl;
}
else s_send(verification,"Verification Failed!");
}
}
sleep (1);
return 0;
}
Apache Camel是一個在不同協議之間構建總線的好項目。 – zenbeni 2014-09-22 08:29:58
爲什麼您的應用程序需要使用ZMQ,而不是使用redis模塊/庫/等直接與redis對話?如果redis服務器位於另一臺主機上,您需要通過某種方式首先將數據發送給該主機,那麼通常您會爲該主機編寫一個客戶機應用程序,然後該應用程序會原生地與Redis進行通信。 – Jason 2014-09-22 13:45:17
@Jason我需要通過redis服務器記錄所有的消息。 zmq和redis都本地運行(localhost)但是我找不到從zmq套接字與redis進行通信的方式。我是否需要編寫一個單獨的客戶端? – monsterrrrr 2014-09-22 18:53:14