2012-09-17 45 views
2

我正在嘗試使用pthread在C++中實現線程池。我想將與線程管理相關的邏輯封裝在一個擁有這些線程的對象中。這意味着無論何時該對象被銷燬,線程都必須停止並清理。如何安全地破壞C++中的Posix線程池

什麼是停止和銷燬線程的最佳方法?在開始時取消並停止取消是一個很好的解決方案?或者,取消並加入線程最好?看到我的代碼,我會很感激任何相關的評論。

WorkerThreadManager.h:

#include "WorkerThreadManagerInterface.h" 
#include "utils/mutex.h" 
#include <queue> 
#include <semaphore.h> 

#include <iostream> 

class WorkerThreadManager : public WorkerThreadManagerInterface 
{ 
    public: 
     WorkerThreadManager(unsigned threadsNumber = 5); 
     virtual ~WorkerThreadManager(); 

     virtual void PushTask(thread_function_t A_threadFun, result_function_t A_resultFun); 
     void SignalResults(); 

    private: 
     static void* WorkerThread(void* A_data); 

     void PushResult(int A_result, result_function_t A_resultFun); 

     typedef boost::function<void()> signal_function_t; 

     struct worker_thread_data_t 
     { 
      worker_thread_data_t(thread_function_t A_threadFun, result_function_t A_resultFun) : 
       threadFun(A_threadFun), resultFun(A_resultFun) {} 
      worker_thread_data_t() {} 

      thread_function_t  threadFun; 
      result_function_t  resultFun; 
     }; 


     const unsigned      m_threadsNumber; 
     pthread_t*       m_pthreads; 

     utils::Mutex      m_tasksMutex; 
     sem_t        m_tasksSem; 
     std::queue<worker_thread_data_t> m_tasks; 

     utils::Mutex      m_resultsMutex; 
     std::queue<signal_function_t>  m_results; 
}; 

WorkerThreadManager.cpp:

#include "WorkerThreadManager.h" 
#include "gateway_log.h" 
#include <pthread.h> 

/** 
* @brief Creates semaphore and starts threads. 
*/ 
WorkerThreadManager::WorkerThreadManager(unsigned threadsNumber) : m_threadsNumber(threadsNumber) 
{ 
    if (sem_init(&m_tasksSem, 0, 0)) 
    { 
     std::stringstream ss; 
     ss << "Semaphore could not be initialized: " << errno << " - " << strerror(errno); 
     LOG_FATAL(ss); 
     throw std::runtime_error(ss.str()); 
    } 

    m_pthreads = new pthread_t[m_threadsNumber]; 
    for (unsigned i = 0; i < m_threadsNumber; ++i) 
    { 
     int rc = pthread_create(&m_pthreads[i], NULL, WorkerThreadManager::WorkerThread, (void*) this); 
     if(rc) 
     { 
      std::stringstream ss; 
      ss << "Pthread could not be started: " << errno << " - " << strerror(errno); 
      LOG_FATAL(ss.str()); 

      if (sem_destroy(&m_tasksSem)) 
       LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno)); 

      delete [] m_pthreads; 

      throw std::runtime_error(ss.str()); 
     } 
     else 
     { 
      LOG_DEBUG("Worker thread started " << m_pthreads[i]); 

      if(pthread_detach(m_pthreads[i])) 
       LOG_WARN("Failed to detach worker thread"); 
     } 
    } 
} 

/** 
* @brief Cancels all threads, destroys semaphore 
*/ 
WorkerThreadManager::~WorkerThreadManager() 
{ 
    LOG_DEBUG("~WorkerThreadManager()"); 

    for(unsigned i = 0; i < m_threadsNumber; ++i) 
    { 
     if (pthread_cancel(m_pthreads[i])) 
      LOG_ERROR("Worker thread cancellation failed"); 
    } 

    if (sem_destroy(&m_tasksSem)) 
     LOG_ERROR("Semaphore could not be destroyed: " << errno << " - " << strerror(errno)); 

    delete [] m_pthreads; 
} 

/** 
* @brief Adds new task to queue, so worker threads can 
* @param A_threadFun function which will be executed by thread 
* @param A_resultFun function which will be enqueued for calling with return value of A_threadFun as parameter 
*   after worker thread executes A_threadFun. 
*/ 
void WorkerThreadManager::PushTask(thread_function_t A_threadFun, result_function_t A_resultFun) 
{ 
    utils::ScopedLock mutex(m_tasksMutex); 

    worker_thread_data_t data(A_threadFun, A_resultFun); 
    m_tasks.push(data); 
    sem_post(&m_tasksSem); 
    LOG_DEBUG("Task for worker threads has been added to queue"); 
} 

/** 
* @brief Executes result functions (if there are any) to give feedback 
* to classes which requested task execution in worker thread. 
*/ 
void WorkerThreadManager::SignalResults() 
{ 
    while(true) 
    { 
     signal_function_t signal; 
     { 
      utils::ScopedLock mutex(m_resultsMutex); 
      if(m_results.size()) 
      { 
       signal = m_results.front(); 
       m_results.pop(); 
      } 
      else 
       return; 
     } 

     signal(); 
    } 
} 

/** 
* @brief Enqueues result of function executed in worker thread. 
* @param A_result return value of function executed in worker thread 
* @param A_resultFun function which will be enqueued for calling with A_result as a parameter. 
*/ 
void WorkerThreadManager::PushResult(int A_result, result_function_t A_resultFun) 
{ 
    utils::ScopedLock mutex(m_resultsMutex); 

    signal_function_t signal = boost::bind(A_resultFun, A_result); 
    m_results.push(signal); 
} 


/** 
* @brief worker thread body 
* @param A_data pointer to WorkerThreadManager instance 
*/ 
void* WorkerThreadManager::WorkerThread(void* A_data) 
{ 
    WorkerThreadManager* manager = reinterpret_cast<WorkerThreadManager*>(A_data); 
    LOG_DEBUG("Starting worker thread loop"); 
    while (1) 
    { 
     if (-1 == sem_wait(&manager->m_tasksSem) && errno == EINTR) 
     { 
      LOG_DEBUG("sem_wait interrupted with signal"); 
      continue; 
     } 
     LOG_DEBUG("WorkerThread:::::: about to call lock mutex"); 

     worker_thread_data_t data; 
     { 
      utils::ScopedLock mutex(manager->m_tasksMutex); 
      data = manager->m_tasks.front(); 
      manager->m_results.pop(); 
     } 

     LOG_DEBUG("WorkerThread:::::: about to call resultFun"); 
     int result = data.threadFun(); 
     LOG_DEBUG("WorkerThread:::::: after call resultFun"); 
     pthread_testcancel(); 

     manager->PushResult(result, data.resultFun); 
    } 

    return NULL; 
} 

main.cpp中:

#include "gateway_log.h" 
#include "WorkerThreadManager.h" 
#include <memory> 

class A { 
public: 
    int Fun() { LOG_DEBUG("Fun before sleep"); sleep(8); LOG_DEBUG("Fun after sleep");return 0; } 
    void Result(int a) { LOG_DEBUG("Result: " << a); } 
}; 


int main() 
{ 
    sd::auto_ptr<WorkerThreadManager> workerThreadManager = new WorkerThreadManager; 
    A a; 
    workerThreadManager->PushTask(boost::bind(&A::Fun, &a), boost::bind(&A::Result, &a, _1)); 
    sleep(3); 
    LOG_DEBUG("deleting workerThreadManager"); 
    workerThreadManager.reset();     // <<<--- CRASH 
    LOG_DEBUG("deleted workerThreadManager"); 
    sleep(10); 
    LOG_DEBUG("after sleep");  

    return 0; 
} 

請注意,描述here這段代碼有問題。

+0

作爲一個觀點:'pthread_cancel'是惡魔:)它最終難以控制終止,並且可能會或可能不會很好地與C++一起玩(取決於實現方式)。 http://stackoverflow.com/questions/4760687/cancelling-a-thread-using-pthread-cancel-good-practice-or-bad –

+1

關閉作爲線程池一部分的線程的最佳方式是排隊一個「死亡工作」。當一個線程從隊列中抽取一份工作時,它會檢查它是否是「死亡工作」,如果是,則自行終止。 –

+0

哦,是的,在auto_ptr初始化中,我想念「新WorkerThreadManager」,我很抱歉。當我試圖準備這篇文章時,我複製並粘貼了來自我的應用程序的代碼。我也從一些測試代碼中清除了這個示例,所以在回溯中看到的行號可能與粘貼代碼中的行號不完全相同。 – Marcin

回答

2

關於安全停止:我更喜歡pthread_join。我不使用pthread_cancel - 我正在使用特殊的停止消息,但我總是有事件驅動的線程(意味着有一些消息隊列的線程)。當線程得到exit-message它停止它的循環,然後加入返回到我的main代碼。

關於你的代碼 - 我建議創建class Thread封裝單線程。池應該有堆創建的對象Thread - 就像現在你有pthread_t的數組。如果你需要池和線程之間的同步 - 那麼你不能確定Thread對象被銷燬就不能退出池析構函數。

+0

我應該怎麼叫醒當我想要加入我的工作者線程時?最簡單的解決方案,但可能不是最好的方法是每個工作線程調用'post_sem'一次。 – Marcin

+0

你可以添加一些'exitThread'標誌 - 將它設置爲'true',然後發出信號量多少次,就像線程停止一樣。但是,這只是一個很好的建議,經過一個快速的觀察,不要太認真;) – PiotrNycz