2014-02-07 60 views
2

我想調試發生在boost :: interprocess消息隊列內的零星訪問衝突。 (訪問違例讀取共享內存區域中的地址)。boost :: interprocess消息隊列創建時的競爭條件?

環境:boost 1.54,VC++ 2010。在調試&版本構建中發生。

它總是發生在或約在message_queue.hpp線854(在接收的情況下): 評論是由我添加

 recvd_size  = top_msg.len; // top_msg points to invalid location 

或線路756(在發送的情況下)

BOOST_ASSERT(free_msg_hdr.priority == 0); // free_msg_hdr points to invalid location 

看起來好像這與消息隊列創建有關。如果消息隊列是「正確」創建的(即沒有可能的競爭條件),則錯誤不會發生。 否則它可能發生在隊列上的timed_receive()或timed_send()表面上隨機的時間。

我想出了一個代表問題的簡短例子: 不幸的是,我不能在Coliru上運行它,因爲它需要兩個進程。 一個必須啓動沒有任何參數,第二個與任何單個參數。 經過多次運行後,其中一個進程將在message_queue中崩潰。

#include <iostream> 
#include <boost/interprocess/ipc/message_queue.hpp> 
#include <boost/thread.hpp> 
#include <boost/assert.hpp> 
#include <boost/date_time.hpp> 

using namespace boost::interprocess; 
using namespace boost::posix_time; 
using boost::posix_time::microsec_clock; // microsec_clock is ambiguous between boost::posix_time and boost::interprocess. What are the odds? 

int main(int argc, wchar_t** argv) 
{ 
    while(true) 
    { 
     int proc = 0; 
     message_queue* queues[2] = {NULL, NULL}; 
     std::string names[] = {"msgq0", "msgq1"}; 
     if(1 == argc) 
     { 
      proc = 0; 
      message_queue::remove(names[0].c_str()); 
      if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; } 
      queues[0] = new message_queue(open_or_create, names[0].c_str(), 128, 10240); 

      bool bRet = false; 
      do 
      { 
       try 
       { 
        if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; } 
        queues[1]=new message_queue(open_only, names[1].c_str()); 
        bRet = true; 
       } 
       catch(const interprocess_exception&) 
       { 
        //boost::this_thread::sleep(boost::posix_time::milliseconds(2)); 
        delete queues[1]; 
        queues[1] = NULL; 
        continue; 
       } 
      }while(!bRet); 

     } 
     else 
     { 
      proc = 1; 
      message_queue::remove(names[1].c_str()); 
      if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; } 
      queues[1] = new message_queue(open_or_create, names[1].c_str(), 128, 10240); 

      bool bRet = false; 
      do 
      { 
       try 
       { 
        if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; } 
        queues[0]=new message_queue(open_only, names[0].c_str()); 
        bRet = true; 
       } 
       catch(const interprocess_exception&) 
       { 
        //boost::this_thread::sleep(boost::posix_time::milliseconds(2)); 
        delete queues[0]; 
        queues[0] = NULL; 
        continue; 
       } 
      }while(!bRet); 
     } 

     long long nCnt = 0; 
     for(int i = 0; i < 1; ++i) 
     { 
      if(proc) 
      { 
       std::string sOut; 
       sOut = "Proc1 says: Hello ProcA " + std::to_string(nCnt) + " "; 
       sOut.resize(10230, ':'); 
       for(int n = 0; n < 3; ++n) 
       { 
        queues[1]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1)); 
       } 

       bool bMessage = false; 
       for(int n = 0; n < 3; ++n) 
       { 
        size_t nRec; unsigned int nPrio; 
        std::string sIn; sIn.resize(10240); 
        bMessage = queues[0]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1)); 
        if(bMessage) 
        { 
         sIn.resize(nRec); 
         //std::cout << sIn << " "; 
        } 
       } 
       if(bMessage) 
       { 
        //std::cout << std::endl; 
       } 
      } 
      else 
      { 
       std::string sOut; 
       sOut = "Proc0 says: Hello Procccccccdadae4325a " + std::to_string(nCnt); 
       sOut.resize(10240, '.'); 
       for(int n = 0; n < 3; ++n) 
       { 
        queues[0]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1)); 
       } 

       bool bMessage = false; 
       for(int n = 0; n < 3; ++n) 
       { 
        size_t nRec; unsigned int nPrio; 
        std::string sIn; sIn.resize(10240); 
        bMessage = queues[1]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1)); 
        if(bMessage) 
        { 
         sIn.resize(nRec); 
         //std::cout << sIn << " "; 
        } 
       } 
       if(bMessage) 
       { 
        //std::cout << std::endl; 
       } 
      } 

      nCnt++; 
      boost::this_thread::sleep(boost::posix_time::milliseconds(10)); 
     } 
    } 
    return 0; 
} 

我還在想我可能做錯了什麼,因爲我無法找到有關此問題的其他地方什麼,Boost庫通常是非常好的。

有沒有什麼我可能會做錯在這個例子中的message_queue的用法?

回答

1

我不認爲過程使用open_or_create是一個支持的習慣用法。您是否知道this thread on the mailing list?我無法找到更多的討論,所以在我看來,終身管理最終沒有必要被添加。

因此,您需要手動將創建與boost::interprocess同步,或者可能需要將其中一個進程重試爲open_only隊列,直到其他進程創建該隊列。

+0

謝謝!這就是這個例子所做的。正如你所建議的,每個進程創建傳出隊列(open_or_create)並使用open_only旋轉另一個隊列。然而,只是由於你的帖子和鏈接的預感,我嘗試了create_only而不是open_or_create。我目前正在測試這個,我會回來一點。 – namezero

+0

好吧,經過一些快速測試,create_only和open_only的catch塊中的睡眠(1000)似乎可以將問題緩解到在生產應用程序中不會發生的問題。但是,在測試應用程序中,我仍然可以複製它,儘管不太經常。所以似乎有一場關於創建/打開消息隊列的競賽需要解決。我感到驚訝的是,這顯然從來沒有出現任何地方,雖然關於所使用的鎖的討論使我感到有點使用它的不安。 – namezero

+0

如果再發生這種情況,我可能會考慮其他IPC隊列庫,但現在上面提到的「解決方案」似乎能夠產生可接受的結果,至少在生產應用程序中。我將把這篇文章鏈接到boost bugtracker;也許有人在那裏有一些投入。 – namezero