我有一個簡單的線程池實現。並執行如下 -雖然推送成功,但線程池隊列沒有被推送項更新
問題是,當我插入一個項目在隊列中它不會得到反映在線程池函數void worker_thread()即檢查worker_thread()中的工作隊列。 fucntion總是失敗,任務不會被線程拾取。 你能指出我做錯了什麼嗎?
在此先感謝。
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(new_value));
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty(); });
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty(); });
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
class thread_pool
{
threadsafe_queue<std::function<void()>> work_queue;
std::atomic<bool> done;
int thread_count;
std::vector<std::thread> threads;
void worker_thread()
{
while (!done)
{
std::function<void()> task;
if (work_queue.try_pop(task)) // here work_queue is always empty.
{
task();
}
else
{
std::this_thread::yield();
}
}
}
public:
thread_pool(unsigned thread_count = 5) : done(false), thread_count(thread_count)
{
try
{
for (unsigned i = 0; i < thread_count; ++i)
{
threads.emplace_back(std::thread(&thread_pool::worker_thread, this));
}
}
catch (...)
{
done = true;
throw;
}
}
~thread_pool()
{
done = true;
for (unsigned i = 0; i < thread_count; ++i)
{
threads[i].join();
}
}
template <typename FunctionType>
void submit(FunctionType f)
{
work_queue.push(std::function<void()>(f)); // this shows proper size of queue after push.
}
};
void fun()
{
cout << "hi"<<this_thread::get_id(); // this funciton is never being executed by thread pool.
}
template<class T>
class A
{
private:
int x{ 3 };
public:
void fun(vector<string> &v) // this funciton is never being executed by thread pool.
{
std::cout << v[0].c_str() << endl;
x = 5;
}
};
int main()
{
thread_pool tp(2);
vector<string> v{ "1", "2" };
A<int> a;
tp.submit([&] { a.fun(std::ref(v)); });
tp.submit < void()>(fun);
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
爲什麼你的工作線程自旋產生,他們爲什麼不這樣做了'wait_and_pop'?其次,在你的隊列中使用'abort()'功能是很有用的,它可以用來告訴線程隊列過期。通常,在沒有工作的情況下,不應該做任何工作:即使沒有工作要做,每個工作者線程也會爲您提供1個CPU。 – Yakk