這是一個線程池:
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back(T t) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); });
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
struct thread_pool {
thread_pool(std::size_t n = 1) { start_thread(n); }
thread_pool(thread_pool&&) = delete;
thread_pool& operator=(thread_pool&&) = delete;
~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
template<class F, class R=std::result_of_t<F&()>>
std::future<R> queue_task(F task) {
std::packaged_task<R()> p(std::move(task));
auto r = p.get_future();
tasks.push_back(std::move(p));
return r;
}
template<class F, class R=std::result_of_t<F&()>>
std::future<R> run_task(F task) {
if (threads_active() >= total_threads()) {
start_thread();
}
return queue_task(std::move(task));
}
void terminate() {
tasks.terminate();
}
std::size_t threads_active() const {
return active;
}
std::size_t total_threads() const {
return threads.size();
}
void clear_threads() {
terminate();
threads.clear();
}
void start_thread(std::size_t n = 1) {
while(n-->0) {
threads.push_back(
std::async(std::launch::async,
[this]{
while(auto task = tasks.pop_front()) {
++active;
try{
(*task)();
} catch(...) {
--active;
throw;
}
--active;
}
}
)
);
}
}
private:
std::vector<std::future<void>> threads;
threaded_queue<std::packaged_task<void()>> tasks;
std::atomic<std::size_t> active;
};
你給它多線程或者在建或通過start_thread
如何。
然後您queue_task
。這會返回一個std::future
,告訴你任務何時完成。
隨着線程完成任務,他們去threaded_queue
並尋找更多。
當threaded_queue
被銷燬時,它會中止其中的所有數據。
當thread_pool
被銷燬時,它會中止所有將來的任務,然後等待所有未完成的任務完成。
Live example。
爲了記錄,總結一個'vector'是通過工作者線程協調任務的一個可怕情況,這些線程急切地從一組共同的值中抽取出來;要做的工作量很小,同步的成本只能確保一次計算的每個值都很高。預先對數據進行分區更有意義,因爲它消除了對同步的需求(除了在結合其結果之前等待所有線程完成),並且可以預測每個線程的數據訪問模式(適用於任何內存系統預取啓發式)。 – ShadowRanger