0
我無法接收使用C在ZeroMQ套接字中的protobuf中序列化的消息。 我有客戶端輸入的序列化消息,並使用zhelpers.h中定義的s_send()函數將此緩衝區發送到服務器。作爲示例,服務器代碼是與zeromq軟件包捆綁在一起的相同測試代碼。使用protobuf序列化消息與zeromq進行通信
這裏是我的客戶端:
#include "amessage.pb-c.h"
#include "zhelpers.h"
int main (void)
{
AMessage msg = AMESSAGE__INIT; // AMessage
void *buf; // Buffer to store serialized data
unsigned len;
printf ("Connecting to server...\n");
void *context = zmq_ctx_new();
void *requester = zmq_socket (context, ZMQ_REQ);
char buffer[256] = "";
printf("[client] :");
scanf("%s", buffer);
msg.csmsg = buffer;
len = amessage__get_packed_size(&msg);
buf = malloc(len);
printf("[client]: pack msg len : %d\n ", len);
printf("Sent msg : %d\n", buf);
amessage__pack(&msg,buf);
s_send(requester, buf);
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}
和服務器端:
#include "zhelpers.h"
#include <pthread.h>
#include <stdlib.h>
#include "amessage.pb-c.h"
#define MAX_MSG_SIZE 256
static size_t read_buffer (unsigned max_length, unsigned char *out)
{
size_t cur_len = 0, nread;
uint8_t c;
while ((nread=fread(out + cur_len, 1, max_length - cur_len, stdin)) != 0)
{
cur_len += nread;
if (cur_len == max_length)
{
fprintf(stderr, "[server]: max message length exceeded\n");
exit(1);
}
}
return cur_len;
}
static void * worker_routine (void *context)
{
AMessage *msg;
uint8_t buf[MAX_MSG_SIZE];
char buffer[256];
// Socket to talk to dispatcher
void *receiver = zmq_socket (context, ZMQ_REP);
zmq_connect (receiver, "inproc://workers");
while (1) {
uint8_t *string = s_recv (receiver);
if(string == 0)
printf("[server]: Error: In receiving msg.\n");
else
{
size_t msg_len = read_buffer (MAX_MSG_SIZE, string);
printf("[server]: client msg len is: %d.\n", msg_len);
msg = amessage__unpack(NULL, msg_len, string);
if (msg == NULL)
{
fprintf(stderr, "[server]: error unpacking incoming message\n");
exit(1);
}
printf ("[client]: %s \n", msg->csmsg);
}
amessage__free_unpacked(msg, NULL);
free (string);
// Do some 'work'
sleep (1);
}
zmq_close (receiver);
return NULL;
}
int main (void)
{
void *context = zmq_ctx_new();
void *clients = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (clients, "tcp://*:5555");
void *workers = zmq_socket (context, ZMQ_DEALER);
zmq_bind (workers, "inproc://workers");
// Launch pool of worker threads
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_routine, context);
}
// Connect work threads to client threads via a queue proxy
zmq_proxy (clients, workers, NULL);
zmq_close (clients);
zmq_close (workers);
zmq_ctx_destroy (context);
return 0;
}
任何想法,我做錯了什麼?
:謝謝你的回答。不過,我在memcpy()服務器端得到異常。 'zmq_msg_recv(&msg,receiver,0); memcpy(rBuf,zmq_msg_data(&msg),zmq_msg_size(&msg));'然而zmq_msg_size在兩端都是一樣的,不知道我在接收消息數據時做了什麼錯誤? – Sam 2013-03-13 06:52:20
什麼異常?寫入無效內存?檢查rBuf指向什麼。否則,爲新問題發佈一個新問題,因爲你的代碼似乎已經發生了很大的變化。 – jpa 2013-03-13 12:02:32