2015-06-24 32 views
0

我想通過使用線程池來更新矢量,其中每個工作人員負責更新矢量的某些部分。更具體地說,我將矢量拆分爲具有空交點的子集,其中集的數量由可用於我的計算機(4)的cpus數給出。多線程更新的矢量

每個線程工作者將更新:

std::array<int, arraySize> vec 

如果陣列聲明爲private類MultiThreadedUpdateOfVector內,那麼功能不起作用:每個行業更新的vec的相應部分,但變化不由下一個線程挑選。所以vec的行爲就好像是每個線程的局部變量。

std::array<int, arraySize> vec 

MultiThreadedUpdateOfVector之前宣稱:如果

這個問題就不存在了。

你能解釋這種不需要的行爲嗎?

您可以建議一個解決方案,其中std::array<int, arraySize> vec仍然是MultiThreadedUpdateOfVector的成員?

謝謝!

#include "stdafx.h" 
#include <iostream> 
#include <vector> 
#include <chrono> 
#include <thread> 
#include <array> 

#include "ThreadPool.h" 
using namespace std; 

const int block = 2; 
const int arraySize = 8; 

class MultiThreadedUpdateOfVector 
{ 
public: 
    MultiThreadedUpdateOfVector() 
    { 

    } 
    bool setArray(const int& i, const int& j) 
    { 
     std::thread::id id = std::this_thread::get_id(); 
     for (int kk = i; kk < j; ++kk) 
     { 
      vec[kk] = i * 10000 + j * 100 + kk; 
     } 
     return true; 
    } 
    void print() 
    { 
     for (unsigned kk = 0; kk < vec.size(); ++kk) 
     { 
      std::cout << kk << "  " << vec[kk] << endl; 
     } 
    } 
private: 
    std::array<int, arraySize> vec; 
}; 

int main() 
{ 
    std::thread::id id = std::this_thread::get_id(); 
    ThreadPool pool(4); 
    std::vector<std::future<bool> >results; 

    MultiThreadedUpdateOfVector h; 
    int begin = 0; 
    int end = block; 
    for (int i = 0; i < 4; ++i) { 
      results.push_back(pool.enqueue(&MultiThreadedUpdateOfVector::setArray, h, begin, end)); 
     begin = end + 1; 
     end += block; 
    } 
    for (int i = 0; i < 4; ++i) 
     results[i].get(); 
    h.print(); 
    return 0; 
} 

#include <vector> 
#include <queue> 
#include <memory> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 
#include <future> 
#include <functional> 
#include <stdexcept> 

class ThreadPool { 
public: 
    ThreadPool(size_t); 
    template<class F, class... Args> 
    auto enqueue(F&& f, Args&&... args) 
     ->std::future<typename std::result_of<F(Args...)>::type>; 
    ~ThreadPool(); 
    int getTasksSize() 
    { 
     std::unique_lock<std::mutex> lock(this->queue_mutex_m); 
     std::thread::id id1 = std::this_thread::get_id(); 
     return tasks_m.size(); 
    } 
private: 
    // need to keep track of threads so we can join them 
    std::vector<std::thread> workers_m; 
    // the task queue 
    std::queue< std::function<void()> > tasks_m; 

    // synchronization 
    std::mutex queue_mutex_m; 
    std::condition_variable condition_m; 
    bool stop_m; 
}; 

// the constructor just launches some amount of workers 
inline ThreadPool::ThreadPool(size_t threads) 
    : stop_m(false) 
{ 
    std::thread::id id = std::this_thread::get_id(); 
    for (size_t i = 0; i < threads; ++i) 
    { 
     workers_m.emplace_back(
      [this] 
     { 
      for (;;) 
      { 
       std::function<void()> task; 
       { 
        std::unique_lock<std::mutex> lock(this->queue_mutex_m); 
        std::thread::id id1 = std::this_thread::get_id(); 
        this->condition_m.wait(lock, [this]{ return this->stop_m || !this->tasks_m.empty(); }); 
        std::thread::id id = std::this_thread::get_id(); 
        if (this->stop_m && this->tasks_m.empty()) 
         return; 
        task = std::move(this->tasks_m.front()); 
        this->tasks_m.pop(); 
       } 
       task(); 
      } 
     } 
     ); 
    } 
} 

// add new work item to the pool 
template<class F, class... Args> 
auto ThreadPool::enqueue(F&& f, Args&&... args) 
-> std::future<typename std::result_of<F(Args...)>::type> 
{ 
    std::thread::id id = std::this_thread::get_id(); 
    using return_type = typename std::result_of<F(Args...)>::type; 

    auto task = std::make_shared< std::packaged_task<return_type()> >(
     std::bind(std::forward<F>(f), std::forward<Args>(args)...) 
     ); 

    std::future<return_type> res = task->get_future(); 
    { 
     std::unique_lock<std::mutex> lock(queue_mutex_m); 

     // don't allow enqueueing after stopping the pool 
     if (stop_m) 
      throw std::runtime_error("enqueue on stopped ThreadPool"); 

     std::thread::id id = std::this_thread::get_id(); 
     tasks_m.emplace([task](){ (*task)(); }); 
    } 
    condition_m.notify_one(); 
    return res; 
} 

// the destructor joins all threads 
inline ThreadPool::~ThreadPool() 
{ 
    { 
     std::unique_lock<std::mutex> lock(queue_mutex_m); 
     stop_m = true; 
    } 
    condition_m.notify_all(); 
    for (std::thread &worker : workers_m) 
     worker.join(); 
} 
+0

'const int的&'?這是毫無意義的。 – MSalters

+0

我刪除了const int&,感謝評論。你能回答我的問題嗎? – user2286810

回答

2

是不是你的enqueue存儲一個副本的參數來調用它們?

如果你直接使用thread,這將是問題,你可以使用一個參考包裝—通std::ref(h)而不是h —到線程構造,從而通過引用傳遞h

我認爲你的ThreadPool應該做同樣的事情。 (如果不工作,必須重新設計該ThreadPool需求,以便它的工作)

+0

謝謝Hurkyl,我借用了TreadPool,所以你的問題是相關的。我會嘗試在線程方面更明確地做到這一點,因爲由emplace_back執行的線程構造函數對我來說並不透明。 – user2286810

1

的問題進行了詳細的URL中所描述:

callable with arguments

的解決方案是替代:

pool.enqueue(&MultiThreadedUpdateOfVector::setArray, h, begin, end) 

由:

pool.enqueue(&MultiThreadedUpdateOfVector::setArray, std::ref(h), begin, end)