有可能與最初的方法的幾個挑戰:
boost::asio::io_service
不打算給從中導出或重新實現。請注意缺乏虛擬功能。
- 如果你的線程庫沒有提供查詢線程狀態的能力,那麼狀態信息需要單獨管理。
另一種解決方法是將作業發佈到io_service
,然後檢查它在io_service
中的時間。如果準備運行時間與實際運行時間之間的時間增量高於某個閾值,則表示隊列中的作業數多於服務隊列的線程數。這樣做的主要好處是動態線程池增長邏輯與其他邏輯分離。
以下是使用deadline_timer
完成此操作的示例。
- 設置
deadline_timer
從現在開始到期3
秒。
- 異步等待
deadline_timer
。從設置deadline_timer
時起,處理程序將準備好運行3
秒。
- 在異步處理程序中,檢查相對於計時器何時到期的當前時間。如果它大於
2
秒,則io_service
隊列正在備份,因此請將線程添加到線程池。
例子:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
class thread_pool_checker
: private boost::noncopyable
{
public:
thread_pool_checker(boost::asio::io_service& io_service,
boost::thread_group& threads,
unsigned int max_threads,
long threshold_seconds,
long periodic_seconds)
: io_service_(io_service),
timer_(io_service),
threads_(threads),
max_threads_(max_threads),
threshold_seconds_(threshold_seconds),
periodic_seconds_(periodic_seconds)
{
schedule_check();
}
private:
void schedule_check();
void on_check(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::deadline_timer timer_;
boost::thread_group& threads_;
unsigned int max_threads_;
long threshold_seconds_;
long periodic_seconds_;
};
void thread_pool_checker::schedule_check()
{
// Thread pool is already at max size.
if (max_threads_ <= threads_.size())
{
std::cout << "Thread pool has reached its max. Example will shutdown."
<< std::endl;
io_service_.stop();
return;
}
// Schedule check to see if pool needs to increase.
std::cout << "Will check if pool needs to increase in "
<< periodic_seconds_ << " seconds." << std::endl;
timer_.expires_from_now(boost::posix_time::seconds(periodic_seconds_));
timer_.async_wait(
boost::bind(&thread_pool_checker::on_check, this,
boost::asio::placeholders::error));
}
void thread_pool_checker::on_check(const boost::system::error_code& error)
{
// On error, return early.
if (error) return;
// Check how long this job was waiting in the service queue. This
// returns the expiration time relative to now. Thus, if it expired
// 7 seconds ago, then the delta time is -7 seconds.
boost::posix_time::time_duration delta = timer_.expires_from_now();
long wait_in_seconds = -delta.seconds();
// If the time delta is greater than the threshold, then the job
// remained in the service queue for too long, so increase the
// thread pool.
std::cout << "Job job sat in queue for "
<< wait_in_seconds << " seconds." << std::endl;
if (threshold_seconds_ < wait_in_seconds)
{
std::cout << "Increasing thread pool." << std::endl;
threads_.create_thread(
boost::bind(&boost::asio::io_service::run,
&io_service_));
}
// Otherwise, schedule another pool check.
run();
}
// Busy work functions.
void busy_work(boost::asio::io_service&,
unsigned int);
void add_busy_work(boost::asio::io_service& io_service,
unsigned int count)
{
io_service.post(
boost::bind(busy_work,
boost::ref(io_service),
count));
}
void busy_work(boost::asio::io_service& io_service,
unsigned int count)
{
boost::this_thread::sleep(boost::posix_time::seconds(5));
count += 1;
// When the count is 3, spawn additional busy work.
if (3 == count)
{
add_busy_work(io_service, 0);
}
add_busy_work(io_service, count);
}
int main()
{
using boost::asio::ip::tcp;
// Create io service.
boost::asio::io_service io_service;
// Add some busy work to the service.
add_busy_work(io_service, 0);
// Create thread group and thread_pool_checker.
boost::thread_group threads;
thread_pool_checker checker(io_service, threads,
3, // Max pool size.
2, // Create thread if job waits for 2 sec.
3); // Check if pool needs to grow every 3 sec.
// Start running the io service.
io_service.run();
threads.join_all();
return 0;
}
輸出:
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 7 seconds.
Increasing thread pool.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 4 seconds.
Increasing thread pool.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 3 seconds.
Increasing thread pool.
Thread pool has reached its max. Example will shutdown.
如果我理解正確的話,busy_work任務可能在等待幾秒鐘隊列以及池檢查,即使最大的線程數量,也沒有由於未提前創建新線程,因此尚未到達。這使得這個原則很難使用,因爲動態特性不應該如此降低性能。它應該使任務執行時間更長,而僅僅需要創建新線程所需的時間,而不是靜態池所需的時間。不管怎樣,謝謝你。 – boqapt
@ user484936:你的理解是正確的。游泳池增長髮生_已被檢測到退化;它是匯聚的最簡單方法之一,不應「降低績效」。如果你想分配線程_當你知道它們是需要的,那麼線程狀態需要被管理,爲所有線程引入開銷,並且可能需要狀態邏輯分散在整個代碼中。如果你想分配線程,你預測它們將被需要,然後有一個專門的線程在服務中發佈一個工作,然後定時等待響應。 –
我想知道在只有一個長時間運行的任務執行的情況下會發生什麼,當我們的計時器觸發時我們不必要地將一個線程添加到池中。如果當時實際上沒有更多的事件需要處理,那麼這種方法對我來說似乎是無效的。如果我錯了,請糾正我。 – russoue