2016-05-26 41 views
0

我有一個簡單的線程池實現。並執行如下 -雖然推送成功,但線程池隊列沒有被推送項更新

問題是,當我插入一個項目在隊列中它不會得到反映在線程池函數void worker_thread()即檢查worker_thread()中的工作隊列。 fucntion總是失敗,任務不會被線程拾取。 你能指出我做錯了什麼嗎?

在此先感謝。

template<typename T> 
class threadsafe_queue 
{ 
private: 
    mutable std::mutex mut; 
    std::queue<T> data_queue; 
    std::condition_variable data_cond; 
public: 
    threadsafe_queue() 
    {} 

    void push(T new_value) 
    { 
     std::lock_guard<std::mutex> lk(mut); 
     data_queue.push(std::move(new_value)); 
     data_cond.notify_one(); 
    } 

    void wait_and_pop(T& value) 
    { 
     std::unique_lock<std::mutex> lk(mut); 
     data_cond.wait(lk, [this]{return !data_queue.empty(); }); 
     value = std::move(data_queue.front()); 
     data_queue.pop(); 
    } 

    std::shared_ptr<T> wait_and_pop() 
    { 
     std::unique_lock<std::mutex> lk(mut); 
     data_cond.wait(lk, [this]{return !data_queue.empty(); }); 
     std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data_queue.front()))); 
     data_queue.pop(); 
     return res; 
    } 

    bool try_pop(T& value) 
    { 
     std::lock_guard<std::mutex> lk(mut); 
     if (data_queue.empty()) 
      return false; 
     value = std::move(data_queue.front()); 
     data_queue.pop(); 
    } 

    std::shared_ptr<T> try_pop() 
    { 
     std::lock_guard<std::mutex> lk(mut); 
     if (data_queue.empty()) 
      return std::shared_ptr<T>(); 
     std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data_queue.front()))); 
     data_queue.pop(); 
     return res; 
    } 

    bool empty() const 
    { 
     std::lock_guard<std::mutex> lk(mut); 
     return data_queue.empty(); 
    } 
}; 

class thread_pool 
{ 
    threadsafe_queue<std::function<void()>> work_queue; 
    std::atomic<bool> done; 
    int thread_count; 
    std::vector<std::thread> threads; 

    void worker_thread() 
    { 
     while (!done) 
     { 
      std::function<void()> task; 
      if (work_queue.try_pop(task)) // here work_queue is always empty. 
      { 
       task(); 
      } 
      else 
      { 
       std::this_thread::yield(); 
      } 
     } 
    } 
public: 
    thread_pool(unsigned thread_count = 5) : done(false), thread_count(thread_count) 
    { 
     try 
     { 
      for (unsigned i = 0; i < thread_count; ++i) 
      { 
       threads.emplace_back(std::thread(&thread_pool::worker_thread, this)); 
      } 
     } 
     catch (...) 
     { 
      done = true; 
      throw; 
     } 
    } 

    ~thread_pool() 
    { 
     done = true; 
     for (unsigned i = 0; i < thread_count; ++i) 
     { 
      threads[i].join(); 
     } 
    } 
    template <typename FunctionType> 
    void submit(FunctionType f) 
    { 
     work_queue.push(std::function<void()>(f)); // this shows proper size of queue after push. 
    } 
}; 

void fun() 
{ 
    cout << "hi"<<this_thread::get_id(); // this funciton is never being executed by thread pool. 
} 
template<class T> 
class A 
{ 
    private: 
    int x{ 3 }; 
public: 
     void fun(vector<string> &v) // this funciton is never being executed by thread pool. 
     { 
     std::cout << v[0].c_str() << endl; 
      x = 5; 
     } 

};

 int main() 
    { 
     thread_pool tp(2); 
     vector<string> v{ "1", "2" }; 
      A<int> a; 
      tp.submit([&] { a.fun(std::ref(v)); }); 
     tp.submit < void()>(fun); 
     std::this_thread::sleep_for(std::chrono::seconds(10)); 
     return 0; 
    } 
+0

爲什麼你的工作線程自旋產生,他們爲什麼不這樣做了'wait_and_pop'?其次,在你的隊列中使用'abort()'功能是很有用的,它可以用來告訴線程隊列過期。通常,在沒有工作的情況下,不應該做任何工作:即使沒有工作要做,每個工作者線程也會爲您提供1個CPU。 – Yakk

回答

4

你缺少return true;語句進行try_pop()方法。你應該編譯啓用了警告和編譯器將指向:

ttt.cpp:在成員函數 '布爾threadsafe_queue :: try_pop(T &) [與T =的std ::功能]':TTT。 CPP:57:5:警告:控制 達到非void函數[-Wreturn型]的結束

+0

謝謝你救了我的一天。 :) – user888270