2011-07-05 36 views
4

我使用的boost :: ASIO的boost ::線程實現它接受消息的消息服務,把他們異步如果沒有消息正在處理或隊列消息是否有消息正在處理。不正確使用升壓:: ASIO和boost ::線程

消息速率在我看來很高,約爲2.000消息每秒。有這麼多的消息,我面臨着損壞的消息,但很少。在2.000條消息中大約4-​​8損壞。我相信問題是由於不正確使用boost :: asio和/或boost :: thread庫引起的。

我執行的代碼主要基於this boost tutorial。我找不到一個錯誤,而且自從主要信息出現以後,發現我很難縮小這個問題的範圍。

也許別人有一個想法這裏發生了什麼問題?

基本上這個類是在使用方式如下:

(1)構造函數被調用我的程序的開頭,以啓動線程,因此服務,接受和傳遞消息

(2)每當我想傳輸一條消息時,我都會撥打MessageService::transmitMessage(),將async_write的任務委託給處理消息隊列的線程。

using namespace google::protobuf::io; 
using boost::asio::ip::tcp; 

MessageService::MessageService(std::string ip, std::string port) : 
    work(io_service), resolver(io_service), socket(io_service) { 

    messageQueue = new std::deque<AgentMessage>; 
    tcp::resolver::query query(ip, port); 
    endpoint_iterator = resolver.resolve(query); 

    tcp::endpoint endpoint = *endpoint_iterator; 

    socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect, 
      this, boost::asio::placeholders::error, ++endpoint_iterator)); 

    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service)); 
} 

void MessageService::await() { 

    while (!messageQueue->empty()) { 

     signal(SIGINT, exit); 

     int messagesLeft = messageQueue->size(); 
     sleep(3); 
     std::cout << "Pending Profiler Agents Messages: " 
       << messageQueue->size() << std::endl; 
     if (messagesLeft == messageQueue->size()) { 
      std::cout << "Connection Error" << std::endl; 
      break; 
     } 
    } 

    std::cout << i << std::endl; 
} 

void MessageService::write(AgentMessage agentMessage, long systemTime, 
     int JVM_ID) { 
    agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle()); 
    agentMessage.set_jvm_id(JVM_ID); 
    agentMessage.set_systemtime(systemTime); 
    io_service.post(boost::bind(&MessageService::do_write, this, agentMessage)); 
} 

void MessageService::do_close() { 
    socket.close(); 
} 

void MessageService::transmitMessage(AgentMessage agentMessage) { 

    ++i; 

    boost::asio::streambuf b; 
    std::ostream os(&b); 

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os); 
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output); 

    coded_output->WriteVarint32(agentMessage.ByteSize()); 
    agentMessage.SerializeToCodedStream(coded_output); 

    delete coded_output; 
    delete raw_output; 

    boost::system::error_code ignored_error; 

    boost::asio::async_write(socket, b.data(), boost::bind(
      &MessageService::handle_write, this, 
      boost::asio::placeholders::error)); 
} 

void MessageService::do_write(AgentMessage agentMessage) { 

    bool write_in_progress = !messageQueue->empty(); 
    messageQueue->push_back(agentMessage); 

    if (!write_in_progress) { 
     transmitMessage(agentMessage); 
    } 
} 

void MessageService::handle_write(const boost::system::error_code &error) { 

    if (!error) { 
     messageQueue->pop_front(); 
     if (!messageQueue->empty()) { 
      transmitMessage(messageQueue->front()); 
     } 
    } else { 
     std::cout << error << std::endl; 
     do_close(); 
    } 
} 

void MessageService::handle_connect(const boost::system::error_code &error, 
     tcp::resolver::iterator endpoint_iterator) { 
    // can be used to receive commands from the Java profiler interface 
} 

MessageService::~MessageService() { 
    // TODO Auto-generated destructor stub 
} 

頭文件:

using boost::asio::ip::tcp; 

class MessageService { 
public: 
    MessageService(std::string ip, std::string port); 
    virtual ~MessageService(); 
    void write(AgentMessage agentMessage, long systemTime, int JVM_ID); 
    void await(); 

private: 
    boost::asio::io_service io_service; 
    boost::asio::io_service::work work; 
    tcp::resolver resolver; 
    tcp::resolver::iterator endpoint_iterator; 
    tcp::socket socket; 
    std::deque<AgentMessage> *messageQueue; 

    void do_write(AgentMessage agentMessage); 

    void do_close(); 

    void handle_write(const boost::system::error_code &error); 

    void handle_connect(const boost::system::error_code &error, 
      tcp::resolver::iterator endpoint_iterator); 

    void transmitMessage(AgentMessage agentMessage); 
}; 
+0

既然你提到'boost :: thread'我只能假設你在程序中使用了多個線程,但你沒有鎖定來確保訪問消息隊列(我假設這是唯一的共享資源)是安全的 - 我必須承認,我沒有經過代碼,所以也許訪問隊列是單線程的...你告訴我 –

+0

我認爲只有一個線程處理消息,因此隊列。由於唯一的功能,至少我認爲它是這種方式,這是由多個線程調用transmitMessage和transmitMessage委託與:io_service.post(boost :: bind(&MessageService :: do_write,this,agentMessage));我認爲這隻能由構造函數中啓動的一個線程執行。但也許我錯了。 –

+0

'transmitMessage'不是由多個線程調用的,你只有一個線程在io服務上調度。我看不出上面的代碼有什麼問題(只要你只從另一個線程調用'write' - 說主線程) - 我會檢查接收者代碼。 – Nim

回答

2

這種方法似乎值得懷疑到我

void MessageService::transmitMessage(AgentMessage agentMessage) { 
    ++i; 

    boost::asio::streambuf b; 
    std::ostream os(&b); 

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os); 
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output); 

    coded_output->WriteVarint32(agentMessage.ByteSize()); 
    agentMessage.SerializeToCodedStream(coded_output); 

    delete coded_output; 
    delete raw_output; 

    boost::system::error_code ignored_error; 

    boost::asio::async_write(socket, b.data(), boost::bind(
      &MessageService::handle_write, this, 
      boost::asio::placeholders::error)); 
} 

你似乎是序列化AgentMessage(應該通過const引用BTW傳遞)成streambuf。然而,這種串行化的數據並不能保證存在,直到async_write完成處理程序被調用時,其被明確地在ASYNC_WRITE documentation

緩衝器

要被寫入包含該數據的一個或多個緩衝器中描述。 雖然緩衝區對象可能是 在必要時被複制,但調用者保留 下層存儲塊的所有權,其必須保證 它們保持有效,直到調用 處理程序。

解決此行爲,確保直到完成處理程序被調用的緩衝區保持在範圍內。一種方法是將緩衝區作爲參數傳遞給有界的完成處理程序:

boost::asio::async_write(socket, b.data(), boost::bind(
      &MessageService::handle_write, this, 
      boost::asio::placeholders::error, 
      coded_output 
      // ^^^ buffer goes here 
      )); 

然後從完成處理程序中刪除它。我建議你也看看使用shared_ptr而不是裸指針。

+0

因此,由於async_write立即返回,但在async_write完成並且transmitMessage被保留時未知,可能不確定分配的緩衝區會發生什麼情況?你會建議如何解決這個問題? –

+0

@platzhirsch當'async_write'完成的時候是衆所周知的,這是當你的完成處理程序被調用時。我用一種可能的解決方案更新了我的答案。 –