4

我需要一些機制讓人想起Win32重置事件,我可以通過具有相同的語義與WaitForSingleObject()和WaitForMultipleObjects()(只需要..SingleObject()版本此時此刻) 。但我的目標是多平臺,所以我擁有的是boost :: threads(AFAIK)。我想出了下面的課,想問一下潛在的問題以及是否能夠完成任務。提前致謝。Win32重置事件像同步類與升壓C++

class reset_event 
{ 
bool flag, auto_reset; 
boost::condition_variable cond_var; 
boost::mutex mx_flag; 

public: 
reset_event(bool _auto_reset = false) : flag(false), auto_reset(_auto_reset) 
{ 
} 

void wait() 
{ 
    boost::unique_lock<boost::mutex> LOCK(mx_flag); 
    if (flag) 
    return; 

    cond_var.wait(LOCK); 
    if (auto_reset) 
    flag = false; 
} 

bool wait(const boost::posix_time::time_duration& dur) 
{ 
    boost::unique_lock<boost::mutex> LOCK(mx_flag); 
    bool ret = cond_var.timed_wait(LOCK, dur) || flag; 
    if (auto_reset && ret) 
    flag = false; 

    return ret; 
} 

void set() 
{ 
    boost::lock_guard<boost::mutex> LOCK(mx_flag); 
    flag = true; 
    cond_var.notify_all(); 
} 

void reset() 
{ 
    boost::lock_guard<boost::mutex> LOCK(mx_flag); 
    flag = false; 
} 
}; 

示例用法;

reset_event terminate_thread; 

void fn_thread() 
{ 
while(!terminate_thread.wait(boost::posix_time::milliseconds(10))) 
{ 
    std::cout << "working..." << std::endl; 
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); 
} 

std::cout << "thread terminated" << std::endl; 
} 

int main() 
{ 
boost::thread worker(fn_thread); 

boost::this_thread::sleep(boost::posix_time::seconds(1)); 
terminate_thread.set(); 

worker.join(); 

return 0; 
} 

編輯

我照着邁克爾·伯爾的建議固定代碼。我的「非常簡單」測試表明沒有問題。

class reset_event 
{ 
    bool flag, auto_reset; 
    boost::condition_variable cond_var; 
    boost::mutex mx_flag; 

public: 
    explicit reset_event(bool _auto_reset = false) : flag(false), auto_reset(_auto_reset) 
    { 
    } 

    void wait() 
    { 
     boost::unique_lock<boost::mutex> LOCK(mx_flag); 
     if (flag) 
     { 
      if (auto_reset) 
       flag = false; 
      return; 
     } 

     do 
     { 
      cond_var.wait(LOCK); 
     } while(!flag); 

     if (auto_reset) 
      flag = false; 
    } 

    bool wait(const boost::posix_time::time_duration& dur) 
    { 
     boost::unique_lock<boost::mutex> LOCK(mx_flag); 
     if (flag) 
     { 
      if (auto_reset) 
       flag = false; 
      return true; 
     } 

     bool ret = cond_var.timed_wait(LOCK, dur); 
     if (ret && flag) 
     { 
      if (auto_reset) 
       flag = false; 

      return true; 
     } 

     return false; 
    } 

    void set() 
    { 
     boost::lock_guard<boost::mutex> LOCK(mx_flag); 
     flag = true; 
     cond_var.notify_all(); 
    } 

    void reset() 
    { 
     boost::lock_guard<boost::mutex> LOCK(mx_flag); 
     flag = false; 
    } 
}; 

回答

3

,你要檢查/修復有幾件事情(注意 - 我絕不是說,只有這些東西 - 我只有一個快速瀏覽一下):

  • wait()功能,你不重置已經告知的事件,如果它的成立爲auto_reset

    void wait() 
    { 
        boost::unique_lock<boost::mutex> LOCK(mx_flag); 
        if (flag) { 
        if (auto_reset) flag = false; // <-- I think you need this 
        return; 
        } 
    
        cond_var.wait(LOCK); 
        if (auto_reset) 
        flag = false; 
    } 
    
  • wait(const boost::posix_time::time_duration& dur)你應該檢查flag之前等待條件變量。

  • 這兩個wait函數中,如果您等待條件變量,您可能需要重新檢查該標誌以確保其他某個線程在此期間未重置事件。對於auto_reset事件尤其如此,即使多個線程正在等待事件時,也應該釋放單個服務器。

+0

非常感謝Michael。你能解釋一下最後一項嗎? – fgungor 2011-01-14 16:33:15

1

這是我的版本,稍作調整以實現以下目標。

  • 沒有使用set(),reset()等阻塞生產者,而是計算「發佈」的數量,而不是在布爾條件爲真後丟失1:1映射。
  • 允許外部調用者指定wait()的互斥量,它在我的使用場景中通常是外部資源,並且可以與內部互斥量分開。
  • 將set(),reset_one(),reset_all()調用移至內部互斥量,現在它們在消費者調用wait()之前重複調用時不會阻塞。

現在我的加載線程可以排隊多個長期請求,而不會在繁忙處理時丟棄任何任務。

進展在我的項目中使用....

升壓條件變量: - >發送3個負載請求,線程是忙,只能看到使用布爾1或2
發佈答: - >由於共享互斥,發送3個加載請求,第2個請求上的生產者塊。生產者在第一次加載請求處理之前不會解鎖。
我的版本 - >送3個的負載要求,生產者立即從所有3個返回,消費者看到3載荷需求緩慢但肯定:)

希望這可以幫助別人那裏。

 

    class cNonLossyCondition 
    { 
     bool flag, auto_reset; 
     boost::condition_variable cond_var; 
     int lost_signals; 
     boost::mutex internal_mutex; 

    public: 
     cNonLossyCondition(bool _auto_reset) 
     { 
      this->flag = false; 
      this->auto_reset = auto_reset; 
      this->lost_signals = 0; 
     } 

     void wait(boost::mutex* mx_flag) 
     { 
      boost::unique_lock LOCK(*mx_flag); 
      if (flag) 
      { 
       if (auto_reset) 
        this->reset_one(); 
       return; 
      } 

      do 
      { 
       cond_var.wait(LOCK); 
      } while(!flag); 

      if (auto_reset) 
       this->reset_one(); 
     } 

     bool wait(boost::mutex* mx_flag,const boost::posix_time::time_duration& dur) 
     { 
      boost::unique_lock LOCK(*mx_flag); 
      if (flag) 
      { 
       if (auto_reset) 
        this->reset_one(); 
       return true; 
      } 

      bool ret = cond_var.timed_wait(LOCK, dur); 
      if (ret && flag) 
      { 
       if (auto_reset) 
        this->reset_one(); 

       return true; 
      } 

      return false; 
     } 

     void set() 
     { 
      boost::lock_guard LOCK(this->internal_mutex); 
      flag = true; 
      if (this->lost_signals lost_signals = 1; //already incremented 
      } else { 
       this->lost_signals = this->lost_signals + 1; 
      } 

      cond_var.notify_all(); 
     } 

     void reset_one() 
     { 
      boost::lock_guard LOCK(this->internal_mutex); 
      this->lost_signals = this->lost_signals - 1; 
      if (this->lost_signals lost_signals = 0; 
       flag = false; 
      } 

     } 
     void reset_all() 
     { 
      boost::lock_guard LOCK(this->internal_mutex); 
      flag = false; 
      this->lost_signals = 0; 
     } 
    };