2013-10-28 38 views
1

我們正在開發一個項目來生成一個通用線程類,以允許我們處理一組相互關聯的數據。 其基本思想是在不同的線程中評估只有未連接和可以同時處理的數據集。 我們基於boost :: thread和基於boost :: mutex的OF_bmutex類開發了一個ThreadClass,以執行日誌記錄操作。Windows上的線程升級

代碼的方案是在鏈接的PDF(http://cdm.unimore.it/dep/test.pd),而主類的骨架低於...

// encapsulate boost::mutex to log... 
class OF_bmutex{ 

public: 

    std::string mutex_type; 
    int m_id; 
    boost::mutex m; 

    void lock(){ 
     std::cout << "Mutex " << mutex_type << m_id << " locking from " << boost::this_thread::get_id() << std::endl;  
     m.lock();  
     std::cout << "Mutex " << mutex_type << m_id << " locked from " << boost::this_thread::get_id() << std::endl; 
    } 

    void unlock(){ 
     std::cout << "Mutex " << mutex_type << m_id << " unlocking from " << boost::this_thread::get_id() << std::endl; 
     m.unlock(); 
     std::cout << "Mutex " << mutex_type << m_id << " unlocked from " << boost::this_thread::get_id() << std::endl; 
    } 

    bool try_lock(){ 
     std::cout << "Mutex " << mutex_type << m_id << " try locking from " << boost::this_thread::get_id() << std::endl; 
     bool ret = m.try_lock(); 
     if(ret){ 
      std::cout << "Mutex " << mutex_type << m_id << " try locked from " << boost::this_thread::get_id() << std::endl; 
     } 
     return(ret); 
    } 
}; 




// My thread class 
class OF_ThreadClass { 
    private: 
     //! running variable 
     bool running; 
     //! The thread executing this process... 
     boost::thread *m_thread; 
     //! The data to process... 
     LinkedDataSet *my_data; 
     //! The id of this thread 
     int thread_id; 
     //! Process the data... 
     virtual int processData(); 
    public: 
     //! The boost thread id 
     boost::thread::id boost_id; 
     //! Thread function 
     void operator()(); 
     //! Default constructor 
     OF_ThreadClass(); 
     //! Destructor 
     ~OF_ThreadClass(); 
     //! Connect this thread with the process data to evaluate 
     void setProcessData(DataToProcess *pd); 
     //! return the thread id 
     int getId() const { return this->thread_id; } 
     //! post process the thread... 
     void post_process(); 
}; 




// The core function with the execution point of the Thread class... 
void OF_ThreadClass::operator()(){ 

    while(this->running){ 

     OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock(); 
     OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->lock(); 

     // PUT HERE OUR CODE... 
     if(running == true){ 
      if(my_data != NULL){ 
       this->processData(); 
      } 
      this->my_data->done = true; 
     } 
     std::cout << ">>>>>> Thread " << thread_id << " notified that evaluation terminated\n"; 

     OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->unlock();    
     OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->lock(); 
    } 
    OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock(); 
} 




// A class to perform multithread calculation... 
class OF_SmartThreads{ 
    private: 

    //! The number of threads to activate 
    int  _n_threads; 
    //! The polling time 
    int  _polling_time; 
    //! The thread pool... 
    std::vector< OF_ThreadClass *> threadPool; 

    //! The stack of the available threads 
    std::set< OF_ThreadClass *> *OF_AVAILABLE_THREADS; 
    //! The set of the running threads 
    std::set< OF_ThreadClass *> OF_RUNNING_THREADS; 

    //! The set of the locked datasets 
    std::set< LinkedDataSet* > locked_data; 
    //! The set of the available datasets 
    std::set< LinkedDataSet* > unlocked_data; 
    //! The set of the datasets under processing 
    std::set< LinkedDataSet* > processing_data; 

    //! The size of the progress bar 
    int progBarDim; 

    public: 
    //! Constructor 
    OF_SmartThreads(); 
    //! Destructor 
    ~OF_SmartThreads(); 

    //! Initialize the SmartThreads 
    int init_function(std::list< LinkedDataSet * > *dList, int n_max_threads); 
    //! Initialize the SmartThreads 
    int init_function(std::set< LinkedDataSet * > *dSet, int n_max_threads); 

    //! Process all the cuncurrent threads.. 
    int process_data(); 
    //! Process all the cuncurrent threads.. 
    int process_data_polling(int polling_time); 
    //! stop the process.. 
    int post_process(); 
}; 




// Initialization function... 
int OF_SmartThreads::init_function(...){ 

    // in the main thread... 

    // Fill the pool of thread mutex... 
    for(int i = 0; i< _n_threads; i++){ 

     _tm = new OF_BMUTEX; 
     _tm->mutex_type.assign("A"); 
     _tm->m_id = i; 
     OF_AVAILABLE_THREADS_MUTEX.push_back(_tm); 

     _tm = new OF_BMUTEX; 
     _tm->mutex_type.assign("R"); 
     _tm->m_id = i; 
     OF_RUNNING_THREADS_MUTEX.push_back(_tm); 
    } 

    // Create the threads... 
    threadPool.resize(_n_threads); 
    for(int i = 0; i< _n_threads; i++){ 

     // ...preventivally lock the resources... 
     OF_RUNNING_THREADS_MUTEX[i]->lock(); 
     OF_AVAILABLE_THREADS_MUTEX[i]->unlock(); 

     // ..create the new thread... 
     pc = new OF_ThreadClass; 
     // insert the new thread in the list... 
     threadPool.at(pc->getId()) = pc; 
     // set it as available... 
     OF_AVAILABLE_THREADS->insert(pc); 
    } 
} 



// Execution function... 
void process_data_polling(int _polling_time){ 
    while (running){ 
     if (something_changed){ 

      //Print the status on the screen... 
      ... 
     } 

     something_changed = false; 

     // Poll the status of the processing data periodically  
     boost::this_thread::sleep(boost::posix_time::millisec(_polling_time)); 

     // Are there some data ready to process? 
     if(OF_UNLOCKED_DATASETS->size() > 0){ 

      // Take the first 
      pd = *OF_UNLOCKED_DATASETS->begin(); 

      // are there some threads available? 
      if(OF_AVAILABLE_THREADS->size() != 0){ 

       //...lock and move the datasets linked to pd... 
       ret = lock_data(pd, LOCK); 

       std::cout << "\tNumber of available threads: " << OF_AVAILABLE_THREADS->size() << std::endl; 

       // Take the available thread... 
       pc = *OF_AVAILABLE_THREADS->begin(); 

       // ...link it the dataset to process... 
       pc->setProcess(pd); 

       OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->lock(); 

       OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->unlock(); 

       something_changed = true; 

      } // available threads 
     } // unlock datasets 



     // Find, unlock and remove finished datasets... 
     pIter2 = OF_RUNNING_THREADS->begin(); 
     pEnd2 = OF_RUNNING_THREADS->end(); 
     while(pIter2 != pEnd2){ 
      pc = *pIter2++; 
      pd = pc->getDataSet(); 

      if(pd->isDone()){ 

       //...unlock and move the datasets linked to the current dataset... 
       ret_move = lock_data(pd, RELEASE_LOCK); 

       //...remove the data from the active set 
       ret_remove = OF_ACTIVE_DATASETS->erase(pd); 

       // make the threads available 
       moveThreads(pc, _RUNNING_, _AVAILABLE_); 

       something_changed = true; 
      } 
     } 



     pIter2 = OF_AVAILABLE_THREADS->begin(); 
     pEnd2 = OF_AVAILABLE_THREADS->end(); 
     while(pIter2 != pEnd2){ 
      pc = *pIter2++; 

      bool obtained = OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->try_lock(); 
      if(obtained){ 
       std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " obtained running mutex..." << std::endl; 
      } 
      else{ 
       std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " failed to obtain running mutex..." << std::endl; 
      } 
      OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->unlock(); 
      std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " released available mutex..." << std::endl; 
     } 

     if( (OF_LOCKED_DATASETS->size() + OF_UNLOCKED_DATASETS->size() + OF_ACTIVE_DATASETS->size()) > 0){ 
      running = true; 
     } 
     else{ 
      running = false; 
     } 

    } // end running... 
} 



// The main function... 
int main(int argc, char* argv[]) { 

    init_function(&data, INT_MAX); 

    process_data_polling(100); 

    lc.post_process(); 

    return 0; 
} 

所有提升1.53編譯的Linux和OSX完全在系統功能。使用的線程數爲2.日誌摘錄如下。 注意從適當的線程發出的互斥日誌...

---> LOG FROM OSX ... 

--------------------------------- 
Number of data: 2 
Data: 0, links: 
Data: 1, links: 

---> OF_SmartThreads::init_function -- 
------------------------------------ 
--> 8 processors/cores detected. 
--> n_max_threads = 2 
------------------------------------ 
       Mutex R0 locking from thread master 
       Mutex R0 locked from thread master 
       Mutex R0 try locking from thread master 
      OF_SMART_THREADS: Thread 0 failed to obtain running mutex... 
       Mutex A0 unlocking from thread master 
       Mutex A0 unlocked from thread master 
New thread 0 created 
       Mutex R1 locking from thread master 
       Mutex R1 locked from thread master 
       Mutex R1 try locking from thread master 
      OF_SMART_THREADS: Thread 1 failed to obtain running mutex... 
       Mutex A1 unlocking from thread master 
       Mutex A1 unlocked from thread master 
New thread 1 created 
--------------------------------- 
Available threads: 2 
Unlocked datasets: 2 


---> OF_SmartThreads::process_data_function 

       Mutex A1 unlocking from thread1 
       Mutex A1 unlocked from thread1 
>>>>>> Thread 1 released available mutex... 
       Mutex R1 locking from thread1 
       Mutex A0 unlocking from thread0 
       Mutex A0 unlocked from thread0 
>>>>>> Thread 0 released available mutex... 
       Mutex R0 locking from thread0 

UNLOCKED DATASETS : 0 1 
LOCKED DATASETS : 
ACTIVE DATASETS : 
RUNNING THREADS : 
    OF_SMART_THREADS: THREADS AVAILABLE 
    Number of available threads: 2 
    OF_SMART_THREADS: take the thread 0 
    OF_SMART_THREADS: Thread master try to lock available mutex... 0 
       Mutex A0 locking from thread master 
       Mutex A0 locked from thread master 
    OF_SMART_THREADS: Thread obtained available mutex... 0 
    OF_SMART_THREADS: Thread try to unlock running mutex... 0 
       Mutex R0 unlocking from thread master 
       Mutex R0 unlocked from thread master 
    OF_SMART_THREADS: Thread released running mutex... 0 

      OF_SMART_THREADS: PREPARE AVAILABLE THREADS 
       Mutex R1 try locking from thread master 
      OF_SMART_THREADS: Thread 1 failed to obtain running mutex... 
       Mutex A1 unlocking from thread master 
       Mutex A1 unlocked from thread master 
      OF_SMART_THREADS: Thread 1 released available mutex... 

UNLOCKED DATASETS : 1 
LOCKED DATASETS : 
ACTIVE DATASETS : 0 
RUNNING THREADS : 0->0 
       Mutex R0 locked from thread0 
>>>>>> Thread 0 obtained running mutex... 
>>>>>> Thread 0 is going to process the dataset 0 
>>>>>> Thread 0 terminated to process the dataset 0 
>>>>>> Thread 0 notified that evaluation terminated 
       Mutex R0 unlocking from thread0 
       Mutex R0 unlocked from thread0 
>>>>>> Thread 0 released running mutex... 
       Mutex A0 locking from thread0 
    OF_SMART_THREADS: THREADS AVAILABLE 
    Number of available threads: 1 
    OF_SMART_THREADS: take the thread 1 
    OF_SMART_THREADS: Thread master try to lock available mutex... 1 
       Mutex A1 locking from thread master 
       Mutex A1 locked from thread master 
    OF_SMART_THREADS: Thread obtained available mutex... 1 
    OF_SMART_THREADS: Thread try to unlock running mutex... 1 
       Mutex R1 unlocking from thread master 
       Mutex R1 unlocked from thread master 
    OF_SMART_THREADS: Thread released running mutex... 1 

     OF_SMART_THREADS: CHECK THREADS DONE 
     ------------> DATASETS 0 done... 
     ------------> DATASETS 0 removed from the active set. 

      OF_SMART_THREADS: PREPARE AVAILABLE THREADS 
       Mutex R0 try locking from thread master 
       Mutex R0 try locked from thread master 
      OF_SMART_THREADS: Thread 0 obtained running mutex... 
       Mutex R1 locked from thread1 
       Mutex A0 unlocking from thread master 
>>>>>> Thread 1 obtained running mutex... 
       Mutex A0 unlocked from thread master 
>>>>>> Thread 1 is going to process the dataset 1 
       Mutex A0 locked from thread0 
      OF_SMART_THREADS: Thread 0 released available mutex... 
>>>>>> Thread 0 obtained available mutex... 

UNLOCKED DATASETS : 
LOCKED DATASETS : 
ACTIVE DATASETS : 1 
RUNNING THREADS : 1->1 
>>>>>> Thread 1 terminated to process the dataset 1 
       Mutex A0 unlocking from thread0 
>>>>>> Thread 1 notified that evaluation terminated 
       Mutex A0 unlocked from thread0 
       Mutex R1 unlocking from thread1 
>>>>>> Thread 0 released available mutex... 
       Mutex R1 unlocked from thread1 
       Mutex R0 locking from thread0 
>>>>>> Thread 1 released running mutex... 
       Mutex A1 locking from thread1 

     OF_SMART_THREADS: CHECK THREADS DONE 
     ------------> DATASETS 1 done... 
     ------------> DATASETS 1 removed from the active set. 

      OF_SMART_THREADS: PREPARE AVAILABLE THREADS 
       Mutex R0 try locking from thread master 
      OF_SMART_THREADS: Thread 0 failed to obtain running mutex... 
       Mutex A0 unlocking from thread master 
       Mutex A0 unlocked from thread master 
      OF_SMART_THREADS: Thread 0 released available mutex... 
       Mutex R1 try locking from thread master 
       Mutex R1 try locked from thread master 
      OF_SMART_THREADS: Thread 1 obtained running mutex... 
       Mutex A1 unlocking from thread master 
       Mutex A1 unlocked from thread master 
      OF_SMART_THREADS: Thread 1 released available mutex... 

OF_SMART_THREADS: ALL THE DATASETS HAS BEEN SUCCESFULLY PROCESSED... 


       Mutex A1 locked from thread1 
       Mutex R0 unlocking from thread master 
>>>>>> Thread 1 obtained available mutex... 
       Mutex R0 unlocked from thread master 
       Mutex R0 locked from thread0 
       Mutex A1 unlocking from thread1 
>>>>>> Thread 0 obtained running mutex... 
       Mutex A1 unlocked from thread1 
>>>>>> Thread 0 notified that evaluation terminated 
>>>>>> Thread 1 released available mutex... 
       Mutex R0 unlocking from thread0 
       Mutex R1 locking from thread1 
       Mutex R0 unlocked from thread0 
>>>>>> Thread 0 released running mutex... 
       Mutex A0 locking from thread0 
       Mutex A0 locked from thread0 
>>>>>> Thread 0 obtained available mutex... 
       Mutex A0 unlocking from thread0 
       Mutex A0 unlocked from thread0 
>>>>>> Thread 0 is terminating... 
       Mutex R1 unlocking from thread master 
       Mutex R1 unlocked from thread master 
       Mutex R1 locked from thread1 
>>>>>> Thread 1 obtained running mutex... 
>>>>>> Thread 1 notified that evaluation terminated 
       Mutex R1 unlocking from thread1 
       Mutex R1 unlocked from thread1 
>>>>>> Thread 1 released running mutex... 
       Mutex A1 locking from thread1 
       Mutex A1 locked from thread1 
>>>>>> Thread 1 obtained available mutex... 
       Mutex A1 unlocking from thread1 
       Mutex A1 unlocked from thread1 
>>>>>> Thread 1 is terminating... 

在Windows 7系統編譯時,無論使用Visual Studio 64位出現的問題,並與MinGW的32位。從 之前的日誌中可以看到,開始時存在死鎖。這對我們來說顯得非常奇怪,不能通過來自不同線程的互斥日誌來解釋。 關於如何調試這個問題的一些建議?

---> LOG FROM WINDOWS 7... 


--------------------------------- 
Number of data: 2 
Data: 0, links: 
Data: 1, links: 

-———> OF_SmartThreads::init_function -- 
------------------------------------ 
--> 4 processors/cores detected. 
--> n_max_threads = 2 
------------------------------------ 
           Mutex R0 locking from thread master 
           Mutex R0 locked from thread master 
           Mutex R0 try locking from thread master 
         OF_SMART_THREADS: Thread 0 failed to obtain running mutex... 
           Mutex A0 unlocking from thread master 
           Mutex A0 unlocked from thread master 
New thread 0 created 
           Mutex A0 unlocking from thread0 
           Mutex A0 unlocked from thread0 
           Mutex R1 locking from thread master 
           Mutex R1 locked from thread master 
>>>>>> Thread 0 released available mutex... 
           Mutex R0 locking from thread0 
           Mutex R1 try locking from thread master 
         OF_SMART_THREADS: Thread 1 failed to obtain running mutex... 
           Mutex A1 unlocking from thread master 
           Mutex A1 unlocked from thread master 
New thread 1 created 
           Mutex A1 unlocking from thread1 
           Mutex A1 unlocked from thread1 
--------------------------------- 
Available threads: 2 
>>>>>> Thread 1 released available mutex... 
           Mutex R1 locking from thread1 
Unlocked datasets: 2 

---> OF_SmartThreads::process_data_function 


UNLOCKED DATASETS : 0 1 
LOCKED DATASETS : 
ACTIVE DATASETS : 
RUNNING THREADS : 
     OF_SMART_THREADS: THREADS AVAILABLE 
     Number of available threads: 2 
     OF_SMART_THREADS: take the thread 0 
     OF_SMART_THREADS: Thread master try to lock available mutex... 0 
           Mutex A0 locking from thread master 
           Mutex A0 locked from thread master 
     OF_SMART_THREADS: Thread obtained available mutex... 0 
     OF_SMART_THREADS: Thread try to unlock running mutex... 0 
           Mutex R0 unlocking from thread master 
           Mutex R0 unlocked from thread master 
           Mutex R0 locked from thread0 
     OF_SMART_THREADS: Thread released running mutex... 0 
>>>>>> Thread 0 obtained running mutex... 
>>>>>> Thread 0 is going to process the dataset 0 
Process Data: delay 41 

         OF_SMART_THREADS: PREPARE AVAILABLE THREADS 
>>>>>> Thread 0 terminated to process the dataset 0 
>>>>>> Thread 0 notified that evaluation terminated 
           Mutex R1 try locking from thread master 
         OF_SMART_THREADS: Thread 1 failed to obtain running mutex... 
           Mutex A1 unlocking from thread master 
           Mutex A1 unlocked from thread master 
           Mutex R0 unlocking from thread0 
           Mutex R0 unlocked from thread0 
         OF_SMART_THREADS: Thread 1 released available mutex... 

UNLOCKED DATASETS : 1 
LOCKED DATASETS : 
ACTIVE DATASETS : 0* 
RUNNING THREADS : 0->0 
>>>>>> Thread 0 released running mutex... 
           Mutex A0 locking from thread0 
     OF_SMART_THREADS: THREADS AVAILABLE 
     Number of available threads: 1 
     OF_SMART_THREADS: take the thread 1 
     OF_SMART_THREADS: Thread master try to lock available mutex... 1 
           Mutex A1 locking from thread master 

有一個僵局,主線程無法鎖定互斥A1,但是,你可以看到從日誌,沒有其他線程之前鎖定該互斥體。有關如何調試此問題的建議?

問候

+0

你不能證明有或沒有死鎖的'cout's。其中一些是同步的,有些則不是。您無法保證打印一半調試的順序。 – Dariusz

+0

死鎖是由執行掛起的事實所證明的。關於同步:我刪除了另一個互斥體以同步代碼中的cout以提高可讀性,結果相同 –

回答

1

添加鎖監視你的OF_bmutex,像bool locked。您不應該解鎖未鎖定的互斥鎖,或鎖定已鎖定的互斥鎖 - 所以放置assert。這似乎是你的init_function沒有事先鎖定OF_AVAILABLE_THREADS_MUTEX[i]->unlock();

Boost BasicLockable Concept

m.unlock(); 

要求:當前線程擁有米

所以看起來你違反了unlock()先決條件。這可以看出你的日誌:

Mutex A0 unlocking from thread master 
Mutex A0 unlocked from thread master 
+0

謝謝Evgeny,我將修復不良的初始化過程。我不清楚布爾變量和assert語句如何幫助我調試... –

+0

@GabrieleCuccolini將標誌'bool locked;'添加到'OF_bmutex'。在構造函數中將它設置爲'false'。在'lock()'添加代碼'assert(!locked); locked = true;',在解鎖中添加'assert(locked); locked = false;'等 –