2013-03-11 68 views
0

我無法接收使用C在ZeroMQ套接字中的protobuf中序列化的消息。 我有客戶端輸入的序列化消息,並使用zhelpers.h中定義的s_send()函數將此緩衝區發送到服務器。作爲示例,服務器代碼是與zeromq軟件包捆綁在一起的相同測試代碼。使用protobuf序列化消息與zeromq進行通信

這裏是我的客戶端:

#include "amessage.pb-c.h" 
#include "zhelpers.h" 

int main (void) 
{ 

    AMessage msg = AMESSAGE__INIT; // AMessage 
    void *buf;      // Buffer to store serialized data 
    unsigned len; 

    printf ("Connecting to server...\n"); 
    void *context = zmq_ctx_new(); 
    void *requester = zmq_socket (context, ZMQ_REQ); 

    char buffer[256] = ""; 

    printf("[client] :"); 
    scanf("%s", buffer); 

    msg.csmsg = buffer; 
    len = amessage__get_packed_size(&msg);   
    buf = malloc(len); 

    printf("[client]: pack msg len : %d\n ", len); 
    printf("Sent msg : %d\n", buf); 

    amessage__pack(&msg,buf); 

    s_send(requester, buf); 

    zmq_close (requester); 
    zmq_ctx_destroy (context); 
    return 0; 
} 

和服務器端:

#include "zhelpers.h" 
#include <pthread.h> 
#include <stdlib.h> 
#include "amessage.pb-c.h" 

#define MAX_MSG_SIZE 256 

static size_t read_buffer (unsigned max_length, unsigned char *out) 
{ 
    size_t cur_len = 0, nread; 
    uint8_t c; 
    while ((nread=fread(out + cur_len, 1, max_length - cur_len, stdin)) != 0) 
    { 
     cur_len += nread; 
     if (cur_len == max_length) 
     { 
      fprintf(stderr, "[server]: max message length exceeded\n"); 
      exit(1); 
     } 
    } 
    return cur_len; 
} 


static void * worker_routine (void *context) 
{ 

    AMessage *msg; 

    uint8_t buf[MAX_MSG_SIZE]; 
    char buffer[256]; 

    // Socket to talk to dispatcher 
    void *receiver = zmq_socket (context, ZMQ_REP); 
    zmq_connect (receiver, "inproc://workers"); 

    while (1) { 

      uint8_t *string = s_recv (receiver); 

     if(string == 0) 
      printf("[server]: Error: In receiving msg.\n"); 
     else 
     { 

     size_t msg_len = read_buffer (MAX_MSG_SIZE, string); 
     printf("[server]: client msg len is: %d.\n", msg_len); 
     msg = amessage__unpack(NULL, msg_len, string); 
     if (msg == NULL) 
     { 
      fprintf(stderr, "[server]: error unpacking incoming message\n"); 
      exit(1); 
     } 
     printf ("[client]: %s \n", msg->csmsg); 

    } 
    amessage__free_unpacked(msg, NULL); 
    free (string); 
    // Do some 'work' 
    sleep (1); 

    } 
    zmq_close (receiver); 
    return NULL; 
} 

    int main (void) 
    { 
    void *context = zmq_ctx_new(); 
    void *clients = zmq_socket (context, ZMQ_ROUTER); 
    zmq_bind (clients, "tcp://*:5555"); 

    void *workers = zmq_socket (context, ZMQ_DEALER); 
    zmq_bind (workers, "inproc://workers"); 

    // Launch pool of worker threads 
    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 
    // Connect work threads to client threads via a queue proxy 
    zmq_proxy (clients, workers, NULL); 

    zmq_close (clients); 
    zmq_close (workers); 

    zmq_ctx_destroy (context); 
    return 0; 
    } 

任何想法,我做錯了什麼?

回答

2

您正在使用s_send(),它需要一個C字符串作爲參數,並調用strlen()來確定其大小。但是,協議緩衝區數據是二進制數據,並且可能在消息的任何位置包含空字節。

改爲使用zmq_send()並將消息的長度賦予zmq_msg_init_size()函數。

+0

:謝謝你的回答。不過,我在memcpy()服務器端得到異常。 'zmq_msg_recv(&msg,receiver,0); memcpy(rBuf,zmq_msg_data(&msg),zmq_msg_size(&msg));'然而zmq_msg_size在兩端都是一樣的,不知道我在接收消息數據時做了什麼錯誤? – Sam 2013-03-13 06:52:20

+0

什麼異常?寫入無效內存?檢查rBuf指向什麼。否則,爲新問題發佈一個新問題,因爲你的代碼似乎已經發生了很大的變化。 – jpa 2013-03-13 12:02:32

相關問題