2015-06-19 57 views
1

目前我正在使用std :: threads編寫某種Fork/Join模式。 因此我爲std :: thread編寫了一個包裝類,它爲所有孩子使用了一個引用計數器。錯誤條件變量的通知

每當孩子完成其執行時,引用計數器遞減,並向所有等待的線程發送通知。 等待線程等待引用計數器變爲0,這意味着所有子線程完成其執行。

不幸的是,似乎有時會錯過通知。 我使用gdb調試過程序,它向我展示了最深的阻塞線程中的引用計數器實際上已經是0,但它不能識別它。

的類被稱爲ThreadAttachment:

/** 
* \brief For the \p ThreadScheduler the attachment object is a thread itself since for each task a single thread is created. 
* 
* Children management is required for the fork/join model. It is realized by using an atomic reference counter. 
* The reference counter is initially set or changed dynamically by threadsafe operations. 
* It is decreased automatically whenever a child task finishes its execution. 
*/ 
class ThreadAttachment : public Attachment 
{ 
    public: 
     /** 
     * Creates a new thread attachment without creating the actual thread nor starting it. 
     * \param task The thread attachment is created for the corresponding task \p task. 
     */ 
     ThreadAttachment(Task *task); 
     virtual ~ThreadAttachment(); 

     /** 
     * Sets the counter of the child tasks. 
     * \note Threadsafe. 
     */ 
     void setChildCount (int count); 
     /** 
     * Increments the counter of the child tasks by one. 
     * \note Threadsafe. 
     */ 
     void incrementChildCount(); 
     /** 
     * Decrements the counter of the child tasks by one. 
     * 
     * Besides it notifies \ref m_childrenConditionVariable for all threads which means that all threads which are calling \ref joinChildren() are being awakened. 
     * \note Threadsafe. 
     */ 
     void decrementChildCount(); 
     /** 
     * \return Returns the counter of the child tasks. 
     * \note Threadsafe. 
     */ 
     int childCount(); 
     /** 
     * Joins all added children thread attachments. 
     * Waits for notifications of \ref m_childrenConditionVariable if the counter of child tasks is not already 0. 
     * Checks on each notification for the counter to become 0. If the counter is finally 0 it stops blocking and continues the execution. 
     */ 
     void joinChildren(); 

     /** 
     * Allocates the actualy std::thread instance which also starts the thread immdiately. 
     * The thread executes the corresponding task safely when executed itself by the operating systems thread scheduler. 
     * \note This method should only be called once. 
     */ 
     void start(); 

     /** 
     * Joins the previously with \ref start() allocated and started std::thread. 
       * If the std::thread is already done it continues immediately. 
     */ 
     void join(); 

     /** 
     * Detaches the previously with \ref start() allocated and started std::thread. 
     * This releases the thread as well as any control. 
     */ 
     void detach(); 

    private: 
     /** 
     * The thread is created in \ref start(). 
     * It must be started after all attachment properties have been set properly. 
     */ 
     std::unique_ptr<std::thread> m_thread; 
     /** 
     * This mutex protects concurrent operations on \ref m_thread. 
     */ 
     std::mutex m_threadMutex; 
     /** 
     * A reference counter for all existing child threads. 
     * If this value is 0 the thread does not have any children. 
     */ 
     std::atomic_int m_childrenCounter; 
     /** 
     * This mutex is used for the condition variable \ref m_childrenConditionVariable when waiting for a notification. 
     */ 
     std::mutex m_childrenConditionVariableMutex; 
     /** 
     * This condition variable is used to signal this thread whenever one of his children finishes and its children counter is decreased. 
     * Using this variable it can wait in \ref join() for something to happen. 
     */ 
     std::condition_variable m_childrenConditionVariable; 
}; 

的方法開始()啓動該線程:

void ThreadAttachment::start() 
{ 
    /* 
    * Use one single attachment object only once for one single task. 
    * Do not recycle it to prevent confusion. 
    */ 
    assert(this->m_thread.get() == nullptr); 
    ThreadAttachment *attachment = this; 

    /* 
    * Lock the mutex to avoid data races on writing the unique pointer of the thread which is not threadsafe itself. 
    * When the created thread runs it can write data to itself safely. 
    * It is okay to lock the mutex in the method start() since the creation of the thread does not block. 
    * It immediately returns to the method start() in the current thread. 
    */ 
    std::mutex &mutex = this->m_threadMutex; 
    { 
     std::lock_guard<std::mutex> lock(mutex); 

     /* 
     * The attachment should stay valid until the task itself is destroyed. 
     * So it can be passed safely. 
     * 
     * http://stackoverflow.com/a/7408135/1221159 
     * 
     * Since this call does not block and the thread's function is run concurrently the mutex will be unlocked and then the thread can acquire it. 
     */ 
     this->m_thread.reset(new std::thread([attachment, &mutex]() 
     { 
      /* 
      * Synchronize with the thread's creation. 
      * This lock will be acquired after the method start() finished creating the thread. 
      * It is used as simple barrier but should not be hold for any time. 
      * Otherwise potential deadlocks might occur if multiple locks are being hold especially in decreaseParentsChildrenCounter() 
      */ 
      { 
       std::lock_guard<std::mutex> lock(mutex); 
      } 

      attachment->correspondingTask()->executeSafely(); 

      /* 
      * After spawning and joining in the task's logic there should be no more children left. 
      */ 
      assert(attachment->childCount() == 0); 

      /* 
      * Finally the children counter of the parent task has to be decreased. 
      * This has to be done by the scheduler since it is a critical area (access of the different attachments) and therefore must be locked. 
      */ 
      ThreadScheduler *scheduler = dynamic_cast<ThreadScheduler*>(attachment->correspondingTask()->scheduler()); 
      assert(scheduler); 
      scheduler->decreaseParentsChildrenCounter(attachment); 
     })); 
    } 
} 

這是類ThreadScheduler的方法decreaseParentsChildrenCounter():

void ThreadScheduler::decreaseParentsChildrenCounter(ThreadAttachment *attachment) 
{ 
    { 
     std::lock_guard<std::mutex> lock(this->m_mutex); 

     Task *child = attachment->correspondingTask(); 

     assert(child != nullptr); 

     Task *parent = child->parent(); 

     if (parent != nullptr) 
     { 
      Attachment *parentAttachment = this->attachment(parent); 
      assert(parentAttachment); 
      ThreadAttachment *parentThreadAttachment = dynamic_cast<ThreadAttachment*>(parentAttachment); 
      assert(parentThreadAttachment); 
      /* 
      * The parent's children counter must still be greater than 0 since this child is still missing. 
      */ 
      assert(parentThreadAttachment->childCount() > 0); 
      parentThreadAttachment->decrementChildCount(); 
     } 
    } 
} 

它基本上調用父線程的decrementChildCount()。

的方法joinChildren()等待要完成所有的孩子:

void ThreadAttachment::joinChildren() 
{ 
    /* 
    * Since the condition variable is notified each time the children counter is decremented 
    * it will always awake the wait call. 
    * Otherwise the predicate check will make sure that the parent thread continues work. 
    */ 
    std::unique_lock<std::mutex> l(this->m_childrenConditionVariableMutex); 
    this->m_childrenConditionVariable.wait(l, 
     [this] 
     { 
      /* 
      * When the children counter reached 0 no more children are executing and the parent can continue its work. 
      */ 
      return this->childCount() == 0; 
     } 
    ); 
} 

這些原子計數器操作,正如你可以看到我做派,每當值遞減的通知:

void ThreadAttachment::setChildCount(int counter) 
{ 
    this->m_childrenCounter = counter; 
} 

void ThreadAttachment::incrementChildCount() 
{ 
    this->m_childrenCounter++; 
} 

void ThreadAttachment::decrementChildCount() 
{ 
    this->m_childrenCounter--; 

    /* 
    * The counter should never be less than 0. 
    * Otherwise it has not been initialized properly. 
    */ 
    assert(this->childCount() >= 0); 

    /* 
    * Notify all thread which call joinChildren() which should usually only be its parent thread. 
    */ 
    this->m_childrenConditionVariable.notify_all(); 
} 

int ThreadAttachment::childCount() 
{ 
    return this->m_childrenCounter.load(); 
} 

作爲測試用例,我用Fork/Join模式遞歸計算斐波那契數。 我認爲如果錯過了通知,它應該檢查謂詞並檢測兒童計數器爲0.顯然,該值爲0,那麼它如何被錯過?

回答

1

更新影響的條件(在這種情況下,構件count僅在對應於條件(this->m_childrenConditionVariableMutex)互斥鎖內的變量。

查看this answer的推理。

+0

嗯看看這個:http://en.cppreference.com/w/cpp/thread/condition_variable/notify_all它不鎖定任何東西。如果它會鎖定它,那麼等待線程如何獲得鎖來檢查謂詞? –

+0

它在註釋中聲明:通知線程不需要保持與等待線程持有的鎖相同的互斥鎖;實際上這樣做是悲觀化的,因爲通知的線程會立即再次阻塞,等待通知線程釋放鎖定。 –

+0

@Baradé:對不起,已更新。 – mastov