2011-04-27 49 views
4

我實現了我的Ubuntu服務器的簡單的線程池機制(我的多客戶端匿名聊天程序),我需要讓我的工作線程睡眠,直到一個作業(在一個函數指針的形式和參數)需要執行。線程等待家長

我現在的系統正在走出窗口。我(工作者線程)向經理詢問工作是否可用,以及是否有5ms沒有睡眠。如果有,請將作業添加到工作隊列並運行該功能。可憐的週期浪費。

我想要做的是做一個簡單的事件樣的系統。我正在考慮有一個互斥量向量(每個worker都有一個向量),並且在創建時將互斥量的句柄作爲參數傳入。然後在我的經理類(它保存並分發作業)中,每當創建一個線程時,都會鎖定該互斥鎖。當需要執行任務時,解鎖下一個互斥鎖,等待它被鎖定和解鎖,然後重新鎖定它。不過,我想知道是否有更好的方法來達到這個目的。


tldr;所以我的問題是這樣的。讓一個線程等待管理類工作的最有效,最有效和最安全的方法是什麼?輪詢我應該甚至考慮的技術(每次超過1000個客戶端),互斥鎖是否體面?還是有其他技術?

回答

6

你需要的是條件變量。
所有的工作線程調用wait()會暫停它們。

父線程然後戴上一個隊列中的工作項目,並呼籲在條件變量信號。這將喚醒正在睡覺的一條線。它可以從隊列中刪除作業執行作業然後調用等待條件變量返回到休眠狀態。

嘗試:

#include <pthread.h> 
#include <memory> 
#include <list> 

// Use RAII to do the lock/unlock 
struct MutexLock 
{ 
    MutexLock(pthread_mutex_t& m) : mutex(m) { pthread_mutex_lock(&mutex); } 
    ~MutexLock()        { pthread_mutex_unlock(&mutex); } 
    private: 
     pthread_mutex_t& mutex; 
}; 

// The base class of all work we want to do. 
struct Job 
{ 
    virtual void doWork() = 0; 
}; 

// pthreads is a C library the call back must be a C function. 
extern "C" void* threadPoolThreadStart(void*); 

// The very basre minimal part of a thread pool 
// It does not create the workers. You need to create the work threads 
// then make them call workerStart(). I leave that as an exercise for you. 
class ThreadPool 
{ 

    public: 
     ThreadPool(unsigned int threadCount=1); 
     ~ThreadPool(); 

     void addWork(std::auto_ptr<Job> job); 
    private: 

     friend void* threadPoolThreadStart(void*); 
     void workerStart(); 

     std::auto_ptr<Job> getJob(); 

     bool    finished; // Threads will re-wait while this is true. 
     pthread_mutex_t  mutex;  // A lock so that we can sequence accesses. 
     pthread_cond_t  cond;  // The condition variable that is used to hold worker threads. 
     std::list<Job*>  workQueue; // A queue of jobs. 
     std::vector<pthread_t>threads; 
}; 

// Create the thread pool 
ThreadPool::ThreadPool(int unsigned threadCount) 
    : finished(false) 
    , threads(threadCount) 
{ 
    // If we fail creating either pthread object than throw a fit. 
    if (pthread_mutex_init(&mutex, NULL) != 0) 
    { throw int(1); 
    } 

    if (pthread_cond_init(&cond, NULL) != 0) 
    { 
     pthread_mutex_destroy(&mutex); 
     throw int(2); 
    } 
    for(unsigned int loop=0; loop < threadCount;++loop) 
    { 
     if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0) 
     { 
      // One thread failed: clean up 
      for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill) 
      { 
       pthread_kill(threads[kill], 9); 
      } 
      throw int(3); 
     } 
    } 
} 

// Cleanup any left overs. 
// Note. This does not deal with worker threads. 
//  You need to add a method to flush all worker threads 
//  out of this pobject before you let the destructor destroy it. 
ThreadPool::~ThreadPool() 
{ 
    finished = true; 
    for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) 
    { 
     // Send enough signals to free all threads. 
     pthread_cond_signal(&cond); 
    } 
    for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) 
    { 
     // Wait for all threads to exit (they will as finished is true and 
     //        we sent enough signals to make sure 
     //        they are running). 
     void* result; 
     pthread_join(*loop, &result); 
    } 
    // Destroy the pthread objects. 
    pthread_cond_destroy(&cond); 
    pthread_mutex_destroy(&mutex); 

    // Delete all re-maining jobs. 
    // Notice how we took ownership of the jobs. 
    for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop) 
    { 
     delete *loop; 
    } 
} 

// Add a new job to the queue 
// Signal the condition variable. This will flush a waiting worker 
// otherwise the job will wait for a worker to finish processing its current job. 
void ThreadPool::addWork(std::auto_ptr<Job> job) 
{ 
    MutexLock lock(mutex); 

    workQueue.push_back(job.release()); 
    pthread_cond_signal(&cond); 
} 

// Start a thread. 
// Make sure no exceptions escape as that is bad. 
void* threadPoolThreadStart(void* data) 
{ 
    ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart); 
    try 
    { 
     pool->workerStart(); 
    } 
    catch(...){} 
    return NULL; 
} 

// This is the main worker loop. 
void ThreadPool::workerStart() 
{ 
    while(!finished) 
    { 
     std::auto_ptr<Job> job = getJob(); 
     if (job.get() != NULL) 
     { 
      job->doWork(); 
     } 
    } 
} 

// The workers come here to get a job. 
// If there are non in the queue they are suspended waiting on cond 
// until a new job is added above. 
std::auto_ptr<Job> ThreadPool::getJob() 
{ 
    MutexLock lock(mutex); 

    while((workQueue.empty()) && (!finished)) 
    { 
     pthread_cond_wait(&cond, &mutex); 
     // The wait releases the mutex lock and suspends the thread (until a signal). 
     // When a thread wakes up it is help until it can acquire the mutex so when we 
     // get here the mutex is again locked. 
     // 
     // Note: You must use while() here. This is because of the situation. 
     // Two workers: Worker A processing job A. 
     //     Worker B suspended on condition variable. 
     // Parent adds a new job and calls signal. 
     // This wakes up thread B. But it is possible for Worker A to finish its 
     // work and lock the mutex before the Worker B is released from the above call. 
     // 
     // If that happens then Worker A will see that the queue is not empty 
     // and grab the work item in the queue and start processing. Worker B will 
     // then lock the mutext and proceed here. If the above is not a while then 
     // it would try and remove an item from an empty queue. With a while it sees 
     // that the queue is empty and re-suspends on the condition variable above. 
    } 
    std::auto_ptr<Job> result; 
    if (!finished) 
    { result.reset(workQueue.front()); 
     workQueue.pop_front(); 
    } 

    return result; 
} 
+0

條件變量是一個互斥/信號量? – ultifinitus 2011-04-27 06:08:54

+0

@ultifinitus:No.條件變量是線程代碼中最低級別的基元(以及互斥體)。您可以從互斥/條件變量中構建信號量。 – 2011-04-27 06:28:54

+0

哈哈謝謝!我感謝幫助!我喜歡'我把這當做鍛鍊的一部分。我會告訴你最終產品是如何工作的。 – ultifinitus 2011-04-29 13:19:13

2

與多個消費者(工作線程消耗的工作請求)經典的生產者 - 消費者同步。衆所周知的技術是有一個信號量,每個工作線程down()和每次你有工作請求,做up()。比從互斥鎖工作隊列中選擇請求。由於一個up()只會喚醒一個down(),實際上對互斥鎖的爭用實際上是最小的。

或者,你可以做同樣的條件變量,每個線程執行的等待和喚醒一個,當你有工作。隊列本身仍然與互斥體鎖定(condvar無論如何都需要一個)。

最後我不能完全肯定,但其實我覺得你可以實際使用的管道作爲隊列在內的所有同步(工作線程無非是想「讀(的sizeof(要求))」)。有點亂七八糟,但導致更少的上下文切換。

+0

信號是簡單的解決方案。它通常作爲互斥體實現爲條件變量和整數計數。但是,除此之外,您還必須確保您控制對任何其他共享資源的線程訪問權限(例如待處理工作請求的列表)。 – 2011-04-27 16:55:01

+0

@Martin:我描述了兩種方式,並明確表示隊列必須被鎖定。由於同步原語的等價性,選擇取決於在給定環境中哪個更有效。事實上,這只是實現消息隊列,所以當有一個可用時,就使用它(三個等效的同步原語是信號量,條件變量和消息隊列)。 – 2011-04-28 05:09:05

1

做到這一點,最簡單的方法是semaphores。這是一個信號是如何工作的:

信號量基本上是一個變量,它取空/正值。進程可以通過兩種方式與它進行交互:增加或減少信號量。

增加信號量將1加到這個神奇的變量,就是這樣。它減少了事物變得有趣的數量:如果計數達到零並且進程試圖再次降低它,因爲它不能取負值,它將會阻止塊,直到變量上升

如果多個進程塊正在等待減小信號量值,則每增加一個計數單位,只有一個被喚醒。

這使得創建工作/任務系統變得非常容易:您的經理進程對任務進行排隊並增加信號量的值以匹配其餘項目,並且您的工作進程會不斷減少計數並獲取任務。當沒有可用的任務時,它們將阻塞,並且不消耗CPU時間。當出現時,只有一個休眠過程會喚醒。 Insta-sync魔術。

不幸的是,至少在Unix世界中,信號量API並不是非常友好,因爲它由於某種原因而處理sempahores數組而不是單個數組。但是,你是一個簡單的包裝,遠離一個漂亮的界面!

乾杯!

+0

這幾乎完美!我肯定會做一些大量的研究,謝謝你的迴應! – ultifinitus 2011-04-27 06:09:49

3

通常的做法是讓隊列queue出色的工作,保護隊列的互斥鎖mutex,以及等待條件queue_not_empty。然後,每個工作線程將執行以下操作(使用僞API):

while (true) { 
    Work * work = 0; 
    mutex.lock(); 
    while (queue.empty()) 
     if (!queue_not_empty.wait(&mutex, timeout)) 
      return; // timeout - exit the worker thread 
    work = queue.front(); 
    queue.pop_front(); 
    mutex.unlock(); 
    work->perform(); 
} 

wait(&mutex, timeout)呼叫阻塞,直到任一等待條件被髮信號或呼叫超時。通過的mutexwait()內原子解鎖,並在從呼叫返回之前再次鎖定,以向所有參與者提供隊列的一致視圖。 timeout將被選擇爲相當大(秒),並且會導致線程退出(如果有更多工作進入,線程池將開始新線程)。

同時,線程池的工作插入功能做到這一點:

Work * work = ...; 
mutex.lock(); 
queue.push_back(work); 
if (worker.empty()) 
    start_a_new_worker(); 
queue_not_empty.wake_one(); 
mutex.unlock(); 
+0

這就是我想的,謝謝你的超時,我以爲我需要實施某種模具工作..我很感激! – ultifinitus 2011-04-27 06:07:49

2

由於網絡聊天程序大概是I/O密集型而非CPU綁定的,你並不真正需要的線程。您可以使用諸如Boost.AsioGLib main loop等工具在單個線程中處理所有I/O。這些是針對特定於平臺的功能的可移植抽象,這些功能允許程序阻止(可能很大的)一組打開的文件或套接字等待任意的活動,然後在活動發生時立即喚醒並作出響應。

+0

我一直在單線程select()和poll()上做所有事情,我只是擔心速度......有沒有任何需要? – ultifinitus 2011-04-27 06:06:25

+0

@ultifinitus,除非您的程序執行了大量的CPU工作(例如每個連接的套接字上的SSL加密),否則它可能大部分時間都處於空閒狀態,等待客戶端的輸入,並且在輸入時僅使用CPU到達。在多核內分配這樣的工作量沒有任何好處。 – Wyzard 2011-04-27 06:13:48

+0

@Wyzard:太棒了!我應該有一個備份系統(多線程),你覺得呢?如果你說的是真的,我**完**! – ultifinitus 2011-04-27 06:25:33