2012-06-20 89 views
5

我將使用單個線程池io_serviceHTTP Server 3 example)實現boost :: asio服務器。 io_service將綁定到unix域套接字,並將從此套接字上的連接發出的請求傳遞給不同的線程。爲了減少資源消耗,我想讓線程池動態化。boost :: asio中動態線程池的示例

這是一個概念。首先創建一個單獨的線程。當請求到達並且服務器發現池中沒有空閒線程時,它會創建一個新線程並將請求傳遞給它。服務器可以創建最多一些線程數。理想情況下,它應該具有掛起線程的功能,這些線程在一段時間內處於空閒狀態。

有人做出類似的東西嗎?或者也許有人有一個相關的例子?

至於我,我想我應該以某種方式覆蓋io_service.dispatch來實現這一點。

回答

5

有可能與最初的方法的幾個挑戰:

  • boost::asio::io_service不打算給從中導出或重新實現。請注意缺乏虛擬功能。
  • 如果你的線程庫沒有提供查詢線程狀態的能力,那麼狀態信息需要單獨管理。

另一種解決方法是將作業發佈到io_service,然後檢查它在io_service中的時間。如果準備運行時間與實際運行時間之間的時間增量高於某個閾值,則表示隊列中的作業數多於服務隊列的線程數。這樣做的主要好處是動態線程池增長邏輯與其他邏輯分離。

以下是使用deadline_timer完成此操作的示例。

  • 設置deadline_timer從現在開始到期3秒。
  • 異步等待deadline_timer。從設置deadline_timer時起,處理程序將準備好運行3秒。
  • 在異步處理程序中,檢查相對於計時器何時到期的當前時間。如果它大於2秒,則io_service隊列正在備份,因此請將線程添加到線程池。

例子:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 

class thread_pool_checker 
    : private boost::noncopyable 
{ 
public: 

    thread_pool_checker(boost::asio::io_service& io_service, 
         boost::thread_group& threads, 
         unsigned int max_threads, 
         long threshold_seconds, 
         long periodic_seconds) 
    : io_service_(io_service), 
     timer_(io_service), 
     threads_(threads), 
     max_threads_(max_threads), 
     threshold_seconds_(threshold_seconds), 
     periodic_seconds_(periodic_seconds) 
    { 
     schedule_check(); 
    } 

private: 

    void schedule_check(); 
    void on_check(const boost::system::error_code& error); 

private: 

    boost::asio::io_service& io_service_; 
    boost::asio::deadline_timer timer_; 
    boost::thread_group&  threads_; 
    unsigned int    max_threads_; 
    long      threshold_seconds_; 
    long      periodic_seconds_; 
}; 

void thread_pool_checker::schedule_check() 
{ 
    // Thread pool is already at max size. 
    if (max_threads_ <= threads_.size()) 
    { 
    std::cout << "Thread pool has reached its max. Example will shutdown." 
       << std::endl; 
    io_service_.stop(); 
    return; 
    } 

    // Schedule check to see if pool needs to increase. 
    std::cout << "Will check if pool needs to increase in " 
      << periodic_seconds_ << " seconds." << std::endl; 
    timer_.expires_from_now(boost::posix_time::seconds(periodic_seconds_)); 
    timer_.async_wait( 
    boost::bind(&thread_pool_checker::on_check, this, 
       boost::asio::placeholders::error)); 
} 

void thread_pool_checker::on_check(const boost::system::error_code& error) 
{ 
    // On error, return early. 
    if (error) return; 

    // Check how long this job was waiting in the service queue. This 
    // returns the expiration time relative to now. Thus, if it expired 
    // 7 seconds ago, then the delta time is -7 seconds. 
    boost::posix_time::time_duration delta = timer_.expires_from_now(); 
    long wait_in_seconds = -delta.seconds(); 

    // If the time delta is greater than the threshold, then the job 
    // remained in the service queue for too long, so increase the 
    // thread pool. 
    std::cout << "Job job sat in queue for " 
      << wait_in_seconds << " seconds." << std::endl; 
    if (threshold_seconds_ < wait_in_seconds) 
    { 
    std::cout << "Increasing thread pool." << std::endl; 
    threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, 
        &io_service_)); 
    } 

    // Otherwise, schedule another pool check. 
    run(); 
} 

// Busy work functions. 
void busy_work(boost::asio::io_service&, 
       unsigned int); 

void add_busy_work(boost::asio::io_service& io_service, 
        unsigned int count) 
{ 
    io_service.post(
    boost::bind(busy_work, 
       boost::ref(io_service), 
       count)); 
} 

void busy_work(boost::asio::io_service& io_service, 
       unsigned int count) 
{ 
    boost::this_thread::sleep(boost::posix_time::seconds(5)); 

    count += 1; 

    // When the count is 3, spawn additional busy work. 
    if (3 == count) 
    { 
    add_busy_work(io_service, 0); 
    } 
    add_busy_work(io_service, count); 
} 

int main() 
{ 
    using boost::asio::ip::tcp; 

    // Create io service. 
    boost::asio::io_service io_service; 

    // Add some busy work to the service. 
    add_busy_work(io_service, 0); 

    // Create thread group and thread_pool_checker. 
    boost::thread_group threads; 
    thread_pool_checker checker(io_service, threads, 
           3, // Max pool size. 
           2, // Create thread if job waits for 2 sec. 
           3); // Check if pool needs to grow every 3 sec. 

    // Start running the io service. 
    io_service.run(); 

    threads.join_all(); 

    return 0; 
} 

輸出:

Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 7 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 4 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 3 seconds. 
Increasing thread pool. 
Thread pool has reached its max. Example will shutdown.
+1

如果我理解正確的話,busy_work任務可能在等待幾秒鐘隊列以及池檢查,即使最大的線程數量,也沒有由於未提前創建新線程,因此尚未到達。這使得這個原則很難使用,因爲動態特性不應該如此降低性能。它應該使任務執行時間更長,而僅僅需要創建新線程所需的時間,而不是靜態池所需的時間。不管怎樣,謝謝你。 – boqapt

+0

@ user484936:你的理解是正確的。游泳池增長髮生_已被檢測到退化;它是匯聚的最簡單方法之一,不應「降低績效」。如果你想分配線程_當你知道它們是需要的,那麼線程狀態需要被管理,爲所有線程引入開銷,並且可能需要狀態邏輯分散在整個代碼中。如果你想分配線程,你預測它們將被需要,然後有一個專門的線程在服務中發佈一個工作,然後定時等待響應。 –

+0

我想知道在只有一個長時間運行的任務執行的情況下會發生什麼,當我們的計時器觸發時我們不必要地將一個線程添加到池中。如果當時實際上沒有更多的事件需要處理,那麼這種方法對我來說似乎是無效的。如果我錯了,請糾正我。 – russoue