我們正在開發一個項目來生成一個通用線程類,以允許我們處理一組相互關聯的數據。 其基本思想是在不同的線程中評估只有未連接和可以同時處理的數據集。 我們基於boost :: thread和基於boost :: mutex的OF_bmutex類開發了一個ThreadClass,以執行日誌記錄操作。Windows上的線程升級
代碼的方案是在鏈接的PDF(http://cdm.unimore.it/dep/test.pd),而主類的骨架低於...
// encapsulate boost::mutex to log...
class OF_bmutex{
public:
std::string mutex_type;
int m_id;
boost::mutex m;
void lock(){
std::cout << "Mutex " << mutex_type << m_id << " locking from " << boost::this_thread::get_id() << std::endl;
m.lock();
std::cout << "Mutex " << mutex_type << m_id << " locked from " << boost::this_thread::get_id() << std::endl;
}
void unlock(){
std::cout << "Mutex " << mutex_type << m_id << " unlocking from " << boost::this_thread::get_id() << std::endl;
m.unlock();
std::cout << "Mutex " << mutex_type << m_id << " unlocked from " << boost::this_thread::get_id() << std::endl;
}
bool try_lock(){
std::cout << "Mutex " << mutex_type << m_id << " try locking from " << boost::this_thread::get_id() << std::endl;
bool ret = m.try_lock();
if(ret){
std::cout << "Mutex " << mutex_type << m_id << " try locked from " << boost::this_thread::get_id() << std::endl;
}
return(ret);
}
};
// My thread class
class OF_ThreadClass {
private:
//! running variable
bool running;
//! The thread executing this process...
boost::thread *m_thread;
//! The data to process...
LinkedDataSet *my_data;
//! The id of this thread
int thread_id;
//! Process the data...
virtual int processData();
public:
//! The boost thread id
boost::thread::id boost_id;
//! Thread function
void operator()();
//! Default constructor
OF_ThreadClass();
//! Destructor
~OF_ThreadClass();
//! Connect this thread with the process data to evaluate
void setProcessData(DataToProcess *pd);
//! return the thread id
int getId() const { return this->thread_id; }
//! post process the thread...
void post_process();
};
// The core function with the execution point of the Thread class...
void OF_ThreadClass::operator()(){
while(this->running){
OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock();
OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->lock();
// PUT HERE OUR CODE...
if(running == true){
if(my_data != NULL){
this->processData();
}
this->my_data->done = true;
}
std::cout << ">>>>>> Thread " << thread_id << " notified that evaluation terminated\n";
OF_RUNNING_THREADS_MUTEX[ this->thread_id ]->unlock();
OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->lock();
}
OF_AVAILABLE_THREADS_MUTEX[ this->thread_id ]->unlock();
}
// A class to perform multithread calculation...
class OF_SmartThreads{
private:
//! The number of threads to activate
int _n_threads;
//! The polling time
int _polling_time;
//! The thread pool...
std::vector< OF_ThreadClass *> threadPool;
//! The stack of the available threads
std::set< OF_ThreadClass *> *OF_AVAILABLE_THREADS;
//! The set of the running threads
std::set< OF_ThreadClass *> OF_RUNNING_THREADS;
//! The set of the locked datasets
std::set< LinkedDataSet* > locked_data;
//! The set of the available datasets
std::set< LinkedDataSet* > unlocked_data;
//! The set of the datasets under processing
std::set< LinkedDataSet* > processing_data;
//! The size of the progress bar
int progBarDim;
public:
//! Constructor
OF_SmartThreads();
//! Destructor
~OF_SmartThreads();
//! Initialize the SmartThreads
int init_function(std::list< LinkedDataSet * > *dList, int n_max_threads);
//! Initialize the SmartThreads
int init_function(std::set< LinkedDataSet * > *dSet, int n_max_threads);
//! Process all the cuncurrent threads..
int process_data();
//! Process all the cuncurrent threads..
int process_data_polling(int polling_time);
//! stop the process..
int post_process();
};
// Initialization function...
int OF_SmartThreads::init_function(...){
// in the main thread...
// Fill the pool of thread mutex...
for(int i = 0; i< _n_threads; i++){
_tm = new OF_BMUTEX;
_tm->mutex_type.assign("A");
_tm->m_id = i;
OF_AVAILABLE_THREADS_MUTEX.push_back(_tm);
_tm = new OF_BMUTEX;
_tm->mutex_type.assign("R");
_tm->m_id = i;
OF_RUNNING_THREADS_MUTEX.push_back(_tm);
}
// Create the threads...
threadPool.resize(_n_threads);
for(int i = 0; i< _n_threads; i++){
// ...preventivally lock the resources...
OF_RUNNING_THREADS_MUTEX[i]->lock();
OF_AVAILABLE_THREADS_MUTEX[i]->unlock();
// ..create the new thread...
pc = new OF_ThreadClass;
// insert the new thread in the list...
threadPool.at(pc->getId()) = pc;
// set it as available...
OF_AVAILABLE_THREADS->insert(pc);
}
}
// Execution function...
void process_data_polling(int _polling_time){
while (running){
if (something_changed){
//Print the status on the screen...
...
}
something_changed = false;
// Poll the status of the processing data periodically
boost::this_thread::sleep(boost::posix_time::millisec(_polling_time));
// Are there some data ready to process?
if(OF_UNLOCKED_DATASETS->size() > 0){
// Take the first
pd = *OF_UNLOCKED_DATASETS->begin();
// are there some threads available?
if(OF_AVAILABLE_THREADS->size() != 0){
//...lock and move the datasets linked to pd...
ret = lock_data(pd, LOCK);
std::cout << "\tNumber of available threads: " << OF_AVAILABLE_THREADS->size() << std::endl;
// Take the available thread...
pc = *OF_AVAILABLE_THREADS->begin();
// ...link it the dataset to process...
pc->setProcess(pd);
OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->lock();
OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->unlock();
something_changed = true;
} // available threads
} // unlock datasets
// Find, unlock and remove finished datasets...
pIter2 = OF_RUNNING_THREADS->begin();
pEnd2 = OF_RUNNING_THREADS->end();
while(pIter2 != pEnd2){
pc = *pIter2++;
pd = pc->getDataSet();
if(pd->isDone()){
//...unlock and move the datasets linked to the current dataset...
ret_move = lock_data(pd, RELEASE_LOCK);
//...remove the data from the active set
ret_remove = OF_ACTIVE_DATASETS->erase(pd);
// make the threads available
moveThreads(pc, _RUNNING_, _AVAILABLE_);
something_changed = true;
}
}
pIter2 = OF_AVAILABLE_THREADS->begin();
pEnd2 = OF_AVAILABLE_THREADS->end();
while(pIter2 != pEnd2){
pc = *pIter2++;
bool obtained = OF_RUNNING_THREADS_MUTEX[ pc->getId() ]->try_lock();
if(obtained){
std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " obtained running mutex..." << std::endl;
}
else{
std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " failed to obtain running mutex..." << std::endl;
}
OF_AVAILABLE_THREADS_MUTEX[ pc->getId() ]->unlock();
std::cout << "\t\t\tOF_SMART_THREADS: Thread " << pc->getId() << " released available mutex..." << std::endl;
}
if( (OF_LOCKED_DATASETS->size() + OF_UNLOCKED_DATASETS->size() + OF_ACTIVE_DATASETS->size()) > 0){
running = true;
}
else{
running = false;
}
} // end running...
}
// The main function...
int main(int argc, char* argv[]) {
init_function(&data, INT_MAX);
process_data_polling(100);
lc.post_process();
return 0;
}
所有提升1.53編譯的Linux和OSX完全在系統功能。使用的線程數爲2.日誌摘錄如下。 注意從適當的線程發出的互斥日誌...
---> LOG FROM OSX ...
---------------------------------
Number of data: 2
Data: 0, links:
Data: 1, links:
---> OF_SmartThreads::init_function --
------------------------------------
--> 8 processors/cores detected.
--> n_max_threads = 2
------------------------------------
Mutex R0 locking from thread master
Mutex R0 locked from thread master
Mutex R0 try locking from thread master
OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
Mutex A0 unlocking from thread master
Mutex A0 unlocked from thread master
New thread 0 created
Mutex R1 locking from thread master
Mutex R1 locked from thread master
Mutex R1 try locking from thread master
OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
New thread 1 created
---------------------------------
Available threads: 2
Unlocked datasets: 2
---> OF_SmartThreads::process_data_function
Mutex A1 unlocking from thread1
Mutex A1 unlocked from thread1
>>>>>> Thread 1 released available mutex...
Mutex R1 locking from thread1
Mutex A0 unlocking from thread0
Mutex A0 unlocked from thread0
>>>>>> Thread 0 released available mutex...
Mutex R0 locking from thread0
UNLOCKED DATASETS : 0 1
LOCKED DATASETS :
ACTIVE DATASETS :
RUNNING THREADS :
OF_SMART_THREADS: THREADS AVAILABLE
Number of available threads: 2
OF_SMART_THREADS: take the thread 0
OF_SMART_THREADS: Thread master try to lock available mutex... 0
Mutex A0 locking from thread master
Mutex A0 locked from thread master
OF_SMART_THREADS: Thread obtained available mutex... 0
OF_SMART_THREADS: Thread try to unlock running mutex... 0
Mutex R0 unlocking from thread master
Mutex R0 unlocked from thread master
OF_SMART_THREADS: Thread released running mutex... 0
OF_SMART_THREADS: PREPARE AVAILABLE THREADS
Mutex R1 try locking from thread master
OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
OF_SMART_THREADS: Thread 1 released available mutex...
UNLOCKED DATASETS : 1
LOCKED DATASETS :
ACTIVE DATASETS : 0
RUNNING THREADS : 0->0
Mutex R0 locked from thread0
>>>>>> Thread 0 obtained running mutex...
>>>>>> Thread 0 is going to process the dataset 0
>>>>>> Thread 0 terminated to process the dataset 0
>>>>>> Thread 0 notified that evaluation terminated
Mutex R0 unlocking from thread0
Mutex R0 unlocked from thread0
>>>>>> Thread 0 released running mutex...
Mutex A0 locking from thread0
OF_SMART_THREADS: THREADS AVAILABLE
Number of available threads: 1
OF_SMART_THREADS: take the thread 1
OF_SMART_THREADS: Thread master try to lock available mutex... 1
Mutex A1 locking from thread master
Mutex A1 locked from thread master
OF_SMART_THREADS: Thread obtained available mutex... 1
OF_SMART_THREADS: Thread try to unlock running mutex... 1
Mutex R1 unlocking from thread master
Mutex R1 unlocked from thread master
OF_SMART_THREADS: Thread released running mutex... 1
OF_SMART_THREADS: CHECK THREADS DONE
------------> DATASETS 0 done...
------------> DATASETS 0 removed from the active set.
OF_SMART_THREADS: PREPARE AVAILABLE THREADS
Mutex R0 try locking from thread master
Mutex R0 try locked from thread master
OF_SMART_THREADS: Thread 0 obtained running mutex...
Mutex R1 locked from thread1
Mutex A0 unlocking from thread master
>>>>>> Thread 1 obtained running mutex...
Mutex A0 unlocked from thread master
>>>>>> Thread 1 is going to process the dataset 1
Mutex A0 locked from thread0
OF_SMART_THREADS: Thread 0 released available mutex...
>>>>>> Thread 0 obtained available mutex...
UNLOCKED DATASETS :
LOCKED DATASETS :
ACTIVE DATASETS : 1
RUNNING THREADS : 1->1
>>>>>> Thread 1 terminated to process the dataset 1
Mutex A0 unlocking from thread0
>>>>>> Thread 1 notified that evaluation terminated
Mutex A0 unlocked from thread0
Mutex R1 unlocking from thread1
>>>>>> Thread 0 released available mutex...
Mutex R1 unlocked from thread1
Mutex R0 locking from thread0
>>>>>> Thread 1 released running mutex...
Mutex A1 locking from thread1
OF_SMART_THREADS: CHECK THREADS DONE
------------> DATASETS 1 done...
------------> DATASETS 1 removed from the active set.
OF_SMART_THREADS: PREPARE AVAILABLE THREADS
Mutex R0 try locking from thread master
OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
Mutex A0 unlocking from thread master
Mutex A0 unlocked from thread master
OF_SMART_THREADS: Thread 0 released available mutex...
Mutex R1 try locking from thread master
Mutex R1 try locked from thread master
OF_SMART_THREADS: Thread 1 obtained running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
OF_SMART_THREADS: Thread 1 released available mutex...
OF_SMART_THREADS: ALL THE DATASETS HAS BEEN SUCCESFULLY PROCESSED...
Mutex A1 locked from thread1
Mutex R0 unlocking from thread master
>>>>>> Thread 1 obtained available mutex...
Mutex R0 unlocked from thread master
Mutex R0 locked from thread0
Mutex A1 unlocking from thread1
>>>>>> Thread 0 obtained running mutex...
Mutex A1 unlocked from thread1
>>>>>> Thread 0 notified that evaluation terminated
>>>>>> Thread 1 released available mutex...
Mutex R0 unlocking from thread0
Mutex R1 locking from thread1
Mutex R0 unlocked from thread0
>>>>>> Thread 0 released running mutex...
Mutex A0 locking from thread0
Mutex A0 locked from thread0
>>>>>> Thread 0 obtained available mutex...
Mutex A0 unlocking from thread0
Mutex A0 unlocked from thread0
>>>>>> Thread 0 is terminating...
Mutex R1 unlocking from thread master
Mutex R1 unlocked from thread master
Mutex R1 locked from thread1
>>>>>> Thread 1 obtained running mutex...
>>>>>> Thread 1 notified that evaluation terminated
Mutex R1 unlocking from thread1
Mutex R1 unlocked from thread1
>>>>>> Thread 1 released running mutex...
Mutex A1 locking from thread1
Mutex A1 locked from thread1
>>>>>> Thread 1 obtained available mutex...
Mutex A1 unlocking from thread1
Mutex A1 unlocked from thread1
>>>>>> Thread 1 is terminating...
在Windows 7系統編譯時,無論使用Visual Studio 64位出現的問題,並與MinGW的32位。從 之前的日誌中可以看到,開始時存在死鎖。這對我們來說顯得非常奇怪,不能通過來自不同線程的互斥日誌來解釋。 關於如何調試這個問題的一些建議?
---> LOG FROM WINDOWS 7...
---------------------------------
Number of data: 2
Data: 0, links:
Data: 1, links:
-———> OF_SmartThreads::init_function --
------------------------------------
--> 4 processors/cores detected.
--> n_max_threads = 2
------------------------------------
Mutex R0 locking from thread master
Mutex R0 locked from thread master
Mutex R0 try locking from thread master
OF_SMART_THREADS: Thread 0 failed to obtain running mutex...
Mutex A0 unlocking from thread master
Mutex A0 unlocked from thread master
New thread 0 created
Mutex A0 unlocking from thread0
Mutex A0 unlocked from thread0
Mutex R1 locking from thread master
Mutex R1 locked from thread master
>>>>>> Thread 0 released available mutex...
Mutex R0 locking from thread0
Mutex R1 try locking from thread master
OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
New thread 1 created
Mutex A1 unlocking from thread1
Mutex A1 unlocked from thread1
---------------------------------
Available threads: 2
>>>>>> Thread 1 released available mutex...
Mutex R1 locking from thread1
Unlocked datasets: 2
---> OF_SmartThreads::process_data_function
UNLOCKED DATASETS : 0 1
LOCKED DATASETS :
ACTIVE DATASETS :
RUNNING THREADS :
OF_SMART_THREADS: THREADS AVAILABLE
Number of available threads: 2
OF_SMART_THREADS: take the thread 0
OF_SMART_THREADS: Thread master try to lock available mutex... 0
Mutex A0 locking from thread master
Mutex A0 locked from thread master
OF_SMART_THREADS: Thread obtained available mutex... 0
OF_SMART_THREADS: Thread try to unlock running mutex... 0
Mutex R0 unlocking from thread master
Mutex R0 unlocked from thread master
Mutex R0 locked from thread0
OF_SMART_THREADS: Thread released running mutex... 0
>>>>>> Thread 0 obtained running mutex...
>>>>>> Thread 0 is going to process the dataset 0
Process Data: delay 41
OF_SMART_THREADS: PREPARE AVAILABLE THREADS
>>>>>> Thread 0 terminated to process the dataset 0
>>>>>> Thread 0 notified that evaluation terminated
Mutex R1 try locking from thread master
OF_SMART_THREADS: Thread 1 failed to obtain running mutex...
Mutex A1 unlocking from thread master
Mutex A1 unlocked from thread master
Mutex R0 unlocking from thread0
Mutex R0 unlocked from thread0
OF_SMART_THREADS: Thread 1 released available mutex...
UNLOCKED DATASETS : 1
LOCKED DATASETS :
ACTIVE DATASETS : 0*
RUNNING THREADS : 0->0
>>>>>> Thread 0 released running mutex...
Mutex A0 locking from thread0
OF_SMART_THREADS: THREADS AVAILABLE
Number of available threads: 1
OF_SMART_THREADS: take the thread 1
OF_SMART_THREADS: Thread master try to lock available mutex... 1
Mutex A1 locking from thread master
有一個僵局,主線程無法鎖定互斥A1,但是,你可以看到從日誌,沒有其他線程之前鎖定該互斥體。有關如何調試此問題的建議?
問候
你不能證明有或沒有死鎖的'cout's。其中一些是同步的,有些則不是。您無法保證打印一半調試的順序。 – Dariusz
死鎖是由執行掛起的事實所證明的。關於同步:我刪除了另一個互斥體以同步代碼中的cout以提高可讀性,結果相同 –