2015-08-17 98 views
0

我目前正在學習如何使用lib ZeroMQ,朋友建議我使用個人項目。zmq ::代理示例不起作用()

在閱讀文檔並計劃如何使用我的項目的lib之後,我開始使用文檔給出的代碼測試項目。 我使用的測試是this one。 不幸的是它不起作用。我做了一些小修改來測試它。 (我給你準確的代碼,我有我的測試,這是很多我很抱歉,但沒有一切,我認爲這是沒有意義的,它是不可能的幫助我:/)。

我幾乎沒有改變文檔給出的測試,只是增加了一些輸出來測試,我也刪除了客戶端的投票(我認爲問題來自這裏,因爲它阻止了無限循環,甚至認爲有超時)。

#include <vector> 
    #include <thread> 
    #include <memory> 
    #include <functional> 


    #include <zmq.h> 
    #include <zmq.hpp> 
    #include <zhelper.hpp> 

    // This is our client task class. 
    // It connects to the server, and then sends a request once per second 
    // It collects responses as they arrive, and it prints them out. We will 
    // run several client tasks in parallel, each with a different random ID. 
    // Attention! -- this random work well only on linux. 

    class client_task { 
    public: 
     client_task() 
      : ctx_(1), 
       client_socket_(ctx_, ZMQ_DEALER) 
     {} 

     void start() { 
      // generate random identity 
      char identity[10] = {}; 
      sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000)); 
      printf("-> %s\n", identity); 
      client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity)); 
      client_socket_.connect("tcp://localhost:5570"); 

      zmq_pollitem_t items; 
      items.socket = &client_socket_; 
      items.fd = 0; 
      items.events = ZMQ_POLLIN; 
      items.revents = 0; 

      int request_nbr = 0; 
      try { 
       while (true) { 

        for (int i = 0; i < 100; ++i) { 

         // 10 milliseconds 
         sleep(1); 
         std::cout << "ici" << std::endl; 
         if (items.revents & ZMQ_POLLIN) { 
          printf("\n%s ", identity); 
          s_dump(client_socket_); 
         } 

         char request_string[16] = {}; 
         sprintf(request_string, "request #%d", ++request_nbr); 
         client_socket_.send(request_string, strlen(request_string)); 

        } 
       } 

      } 
      catch (std::exception &e) 
      {} 
     } 

    private: 
     zmq::context_t ctx_; 
     zmq::socket_t client_socket_; 
    }; 

    // Each worker task works on one request at a time and sends a random number 
    // of replies back, with random delays between replies: 

    class server_worker { 
    public: 
     server_worker(zmq::context_t &ctx, int sock_type) 
      : ctx_(ctx), 
       worker_(ctx_, sock_type) 
     {} 

     void work() { 
       worker_.connect("inproc://backend"); 

      try { 
       while (true) { 
        zmq::message_t identity; 
        zmq::message_t msg; 
        zmq::message_t copied_id; 
        zmq::message_t copied_msg; 
        worker_.recv(&identity); 
        worker_.recv(&msg); 
        std::cout << "I never arrive here" << std::endl; 

        int replies = within(5); 
        for (int reply = 0; reply < replies; ++reply) { 
         std::cout << "LA" << std::endl; 
         s_sleep(within(1000) + 1); 
         copied_id.copy(&identity); 
         copied_msg.copy(&msg); 
         worker_.send(copied_id, ZMQ_SNDMORE); 
         worker_.send(copied_msg); 
        } 
       } 
      } 
      catch (std::exception &e) {} 
     } 

    private: 
     zmq::context_t &ctx_; 
     zmq::socket_t worker_; 
    }; 

    // This is our server task. 
    // It uses the multithreaded server model to deal requests out to a pool 
    // of workers and route replies back to clients. One worker can handle 
    // one request at a time but one client can talk to multiple workers at 
    // once. 

    class server_task { 
    public: 
     server_task() 
      : ctx_(1), 
       frontend_(ctx_, ZMQ_ROUTER), 
       backend_(ctx_, ZMQ_DEALER) 
     {} 

     void run() { 
      frontend_.bind("tcp://*:5570"); 
      backend_.bind("inproc://backend"); 

      server_worker * worker = new server_worker(ctx_, ZMQ_DEALER); 
      std::thread worker_thread(std::bind(&server_worker::work, worker)); 
      worker_thread.detach(); 

      try { 
       zmq::proxy(&frontend_, &backend_, NULL); 
      } 
      catch (std::exception &e) {} 

     } 

    private: 
     zmq::context_t ctx_; 
     zmq::socket_t frontend_; 
     zmq::socket_t backend_; 
    }; 

    // The main thread simply starts several clients and a server, and then 
    // waits for the server to finish. 

    int main (void) 
    { 
     client_task ct1; 
     client_task ct2; 
     client_task ct3; 
     server_task st; 

     std::thread t1(std::bind(&client_task::start, &ct1)); 
     std::thread t2(std::bind(&client_task::start, &ct2)); 
     std::thread t3(std::bind(&client_task::start, &ct3)); 
     std::thread t4(std::bind(&server_task::run, &st)); 

     t1.detach(); 
     t2.detach(); 
     t3.detach(); 
     t4.detach(); 
     std::cout << "ok" << std::endl; 
     getchar(); 
     std::cout << "ok" << std::endl; 
     return 0; 
    } 

輸出我從這個代碼得到的是以下幾點:

-> CC66-C879 
-> 3292-E961 
-> C4AA-55D1 
ok 
ici 
ici 
ici 
... (infinite ici) 

我真的不明白爲什麼這是行不通的。 客戶端的輪詢在非套接字上發送異常套接字操作。 我面臨的主要問題是這是來自官方文檔的測試,我無法使其工作。關於我使用套接字的問題是什麼?

感謝您的幫助

回答

1

我發現了這個問題。

還有就是官方文檔中的問題(一些明顯的錯誤,如zmq_pollitem_t數組的初始化),另一種是由我的測試不能正常工作。

對於ZMQ ::投票或ZMQ ::代理,你需要投在空插座結構*您必須不插座上使用的指針。 ZMQ poll not working

這些修改後它的工作。我做了另一篇文章解釋爲什麼here

這裏是沒有我的其它附加測試輸出校正代碼:

 // Asynchronous client-to-server (DEALER to ROUTER) 
    // 
    // While this example runs in a single process, that is to make 
    // it easier to start and stop the example. Each task has its own 
    // context and conceptually acts as a separate process. 

    #include <vector> 
    #include <thread> 
    #include <memory> 
    #include <functional> 


    #include <zmq.h> 
    #include <zmq.hpp> 
    #include <zhelper.hpp> 

    // This is our client task class. 
    // It connects to the server, and then sends a request once per second 
    // It collects responses as they arrive, and it prints them out. We will 
    // run several client tasks in parallel, each with a different random ID. 
    // Attention! -- this random work well only on linux. 

    class client_task { 
    public: 
     client_task() 
      : ctx_(1), 
       client_socket_(ctx_, ZMQ_DEALER) 
     {} 

     void start() { 
      // generate random identity 
      char identity[10] = {}; 
      sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000)); 
      printf("-> %s\n", identity); 
      client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity)); 
      client_socket_.connect("tcp://localhost:5555"); 

      zmq_pollitem_t items[1];     
      items[0].socket = static_cast<void *> (client_socket_); 
      items[0].fd = 0; 
      items[0].events = ZMQ_POLLIN; 
      items[0].revents = 0; 
      int request_nbr = 0; 
      try { 
       while (true) { 
        for (int i = 0 ; i < 100; ++i) { 

        zmq::poll(items, 1, 10); 
        if (items[0].revents & ZMQ_POLLIN) { 
          printf("\n%s =>", identity); 
          s_dump(client_socket_); 
         } 
        } 

        char request_string[16] = {}; 
        sprintf(request_string, "request #%d", ++request_nbr); 
        client_socket_.send(request_string, strlen(request_string)); 

       } 

      } 
      catch (std::exception &e) 
      { 
       std::cout << "exception : " << zmq_errno() << " "<< e.what() << std::endl; 
       if (zmq_errno() == EINTR) 
        std::cout << "lol"<< std::endl; 
      } 
     } 

    private: 
     zmq::context_t ctx_; 
     zmq::socket_t client_socket_; 
    }; 

    // Each worker task works on one request at a time and sends a random number 
    // of replies back, with random delays between replies: 

    class server_worker { 
    public: 
     server_worker(zmq::context_t &ctx, int sock_type) 
      : ctx_(ctx), 
       worker_(ctx_, sock_type) 
     {} 

     void work() { 
       worker_.connect("inproc://backend"); 

      try { 
       while (true) { 
        zmq::message_t identity; 
        zmq::message_t msg; 
        zmq::message_t copied_id; 
        zmq::message_t copied_msg; 
        worker_.recv(&identity); 
        worker_.recv(&msg); 

        int replies = within(5); 
        for (int reply = 0; reply < replies; ++reply) { 
         s_sleep(within(1000) + 1); 
         copied_id.copy(&identity); 
         copied_msg.copy(&msg); 
         worker_.send(copied_id, ZMQ_SNDMORE); 
         worker_.send(copied_msg); 
        } 
       } 
      } 
      catch (std::exception &e) 
      { 
       std::cout << "Error in worker : " << e.what() << std::endl; 
      } 
     } 

    private: 
     zmq::context_t &ctx_; 
     zmq::socket_t worker_; 
    }; 

    // This is our server task. 
    // It uses the multithreaded server model to deal requests out to a pool 
    // of workers and route replies back to clients. One worker can handle 
    // one request at a time but one client can talk to multiple workers at 
    // once. 

    class server_task { 
    public: 
     server_task() 
      : ctx_(1), 
       frontend_(ctx_, ZMQ_ROUTER), 
       backend_(ctx_, ZMQ_DEALER) 
     {} 

     void run() { 
      frontend_.bind("tcp://*:5555"); 
      backend_.bind("inproc://backend"); 

      server_worker * worker = new server_worker(ctx_, ZMQ_DEALER); 
      std::thread worker_thread(std::bind(&server_worker::work, worker)); 
      worker_thread.detach(); 

      try { 
       zmq::proxy(static_cast<void *>(frontend_), static_cast<void *> (backend_), NULL); 
      } 
      catch (std::exception &e) 
      { 
       std::cout << "Error in Server : " << e.what() << std::endl; 
      } 

     } 

    private: 
     zmq::context_t ctx_; 
     zmq::socket_t frontend_; 
     zmq::socket_t backend_; 
    }; 

    // The main thread simply starts several clients and a server, and then 
    // waits for the server to finish. 

    int main (void) 
    { 
     client_task ct1; 
     client_task ct2; 
     client_task ct3; 
     server_task st; 

     std::thread t4(std::bind(&server_task::run, &st)); 
     t4.detach(); 
     std::thread t1(std::bind(&client_task::start, &ct1)); 
     std::thread t2(std::bind(&client_task::start, &ct2)); 
     std::thread t3(std::bind(&client_task::start, &ct3)); 

     t1.detach(); 
     t2.detach(); 
     t3.detach(); 

     getchar(); 
     return 0; 
    }