2017-05-24 32 views
2

我想使用QtConcurrent::map函數來操作QVector。我所有的示例程序不會是1QtConcurrent :: map沒有任何好處

QVector<double> arr(10000000, 0); 
QElapsedTimer timer; 
qDebug() << QThreadPool::globalInstance()->maxThreadCount() << "Threads"; 

int end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; }); 
} 
end = timer.elapsed(); 
qDebug() << end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
} 
end = timer.elapsed(); 
qDebug() << end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    QFuture<void> qf = QtConcurrent::map(arr.begin(), arr.end(), [](double &x){ ++x; }); 
    qf.waitForFinished(); 
} 
end = timer.elapsed(); 
qDebug() << end; 

然而遞增的QVector所有值方案產出

4 Threads 
905 // std::transform 
886 // std::for_each 
876 // QtConcurrent::map 

所以幾乎與多線程版本沒有速度優勢。我確認實際上有4個線程正在運行。我使用了-O2優化。更常見的QThreadPool方法更適合這種情況嗎?

編輯:

我嘗試使用QtConcurrent::run()一個differernt方法。下面是程序代碼的相關部分:

void add1(QVector<double>::iterator first, QVector<double>::iterator last) { 
    for(; first != last; ++first) { 
     *first += 1; 
    } 
} 

/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
QFuture<void> qf[numThreads]; 
for(int j = 0; j < numThreads; ++j) { 
    qf[j] = QtConcurrent::run(add1, arr.begin()+j*n/numThreads, arr.begin()+(j+1)*n/numThreads-1); 
} 
for(int j = 0; j < numThreads; ++j) { 
    qf[j].waitForFinished(); 
} 

所以我手動在不同的線程分配任務。但我仍然很難獲得性能提升:

181 ms // std::for_each 
163 ms // QtConcurrent::run 

這裏還有什麼不對?

+1

你爲什麼期望加快速度?您在每次循環迭代中都在等待未來。 – juanchopanza

+0

我不是這方面的專家,但我期望map()啓動4個線程,這應該使這個代碼行比STL函數更快地完成。還是我誤解了這個功能的概念? – NullAchtFuffZehn

回答

3

看起來QtConcurrent與簡單使用C++線程原語和roll-your-own-thread-pools相比具有很高的開銷。

template<class T> 
struct threaded_queue { 
    using lock = std::unique_lock<std::mutex>; 
    void push_back(T t) { 
    { 
     lock l(m); 
     data.push_back(std::move(t)); 
    } 
    cv.notify_one(); 
    } 
    boost::optional<T> pop_front() { 
    lock l(m); 
    cv.wait(l, [this]{ return abort || !data.empty(); }); 
    if (abort) return {}; 
    auto r = std::move(data.back()); 
    data.pop_back(); 
    return std::move(r); 
    } 
    void terminate() { 
    { 
     lock l(m); 
     abort = true; 
     data.clear(); 
    } 
    cv.notify_all(); 
    } 
    ~threaded_queue() 
    { 
    terminate(); 
    } 
private: 
    std::mutex m; 
    std::deque<T> data; 
    std::condition_variable cv; 
    bool abort = false; 
}; 
struct thread_pool { 
    thread_pool(std::size_t n = 1) { start_thread(n); } 
    thread_pool(thread_pool&&) = delete; 
    thread_pool& operator=(thread_pool&&) = delete; 
    ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> queue_task(F task) { 
    std::packaged_task<R()> p(std::move(task)); 
    auto r = p.get_future(); 
    tasks.push_back(std::move(p)); 
    return r; 
    } 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> run_task(F task) { 
    if (threads_active() >= total_threads()) { 
     start_thread(); 
    } 
    return queue_task(std::move(task)); 
    } 
    void terminate() { 
    tasks.terminate(); 
    } 
    std::size_t threads_active() const { 
    return active; 
    } 
    std::size_t total_threads() const { 
    return threads.size(); 
    } 
    void clear_threads() { 
    terminate(); 
    threads.clear(); 
    } 
    void start_thread(std::size_t n = 1) { 
    while(n-->0) { 
     threads.push_back(
     std::async(std::launch::async, 
      [this]{ 
      while(auto task = tasks.pop_front()) { 
       ++active; 
       try{ 
       (*task)(); 
       } catch(...) { 
       --active; 
       throw; 
       } 
       --active; 
      } 
      } 
     ) 
    ); 
    } 
    } 
private: 
    std::vector<std::future<void>> threads; 
    threaded_queue<std::packaged_task<void()>> tasks; 
    std::atomic<std::size_t> active = {}; 
}; 

struct my_timer_t { 
    std::chrono::high_resolution_clock::time_point first; 
    std::chrono::high_resolution_clock::duration duration; 

    void start() { 
     first = std::chrono::high_resolution_clock::now(); 
    } 
    std::chrono::high_resolution_clock::duration finish() { 
     return duration = std::chrono::high_resolution_clock::now()-first; 
    } 
    unsigned long long ms() const { 
     return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(); 
    } 
}; 
int main() { 
    std::vector<double> arr(1000000, 0); 
    my_timer_t timer; 

    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; }); 
    } 
    timer.finish(); 
    auto time_transform = timer.ms(); 
    std::cout << time_transform << "<- std::transform (" << arr[rand()%arr.size()] << ")\n"; 
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
    } 
    timer.finish(); 
    auto time_for_each = timer.ms(); 
    std::cout << time_for_each << "<- std::for_each (" << arr[rand()%arr.size()] << ")\n"; 
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    enum { num_threads = 8 }; 
    thread_pool pool(num_threads); 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::array< std::future<void>, num_threads > tasks; 
     for (int t = 0; t < num_threads; ++t) { 
      tasks[t] = pool.run_task([&,t]{ 
       std::for_each(arr.begin()+(arr.size()/num_threads)*t, arr.begin()+(arr.size()/num_threads)*(t+1), [](double& x){++x;}); 
      }); 
     } 
     // std::cout << "loop! -- " << pool.threads_active() << "/" << pool.total_threads() << std::endl; 
     for (int t = 0; t < num_threads; ++t) 
      tasks[t].wait(); 
    } 
    timer.finish(); 
    auto time_pool = timer.ms(); 
    std::cout << time_pool << "<- thread_pool (" << arr[rand()%arr.size()] << ")\n"; 
} 

Live example。用一個簡單的C++ 11線程池來劃分任務8周時的方式

153<- std::transform (100) 
131<- std::for_each (200) 
82<- thread_pool (300) 

一個顯著加速:

此產生。 (當分裂任務4種方式時大約是105)。

現在我確實使用了比您的測試集小10倍的測試集,因爲在我的程序運行了很長時間之後,在線系統超時。

將會有開銷與您的線程池系統進行通信,但我的天真線程池不應該超過像這樣的真正的庫。

現在,一個嚴重的問題是,你可能是內存IO綁定;如果所有人都必須等待字節,更多線程更快地訪問字節將無濟於事。

+1

你是如何測試'QtConcurrent'的開銷的?請注意,您將'++'操作分組爲'num_threads'批次。你也可以用'QtConcurrent'來做到這一點。 – m7913d

+0

@ m7913d這就是'QtConcurrent'應該做的事情;根據硬件線程的數量啓動一些子線程來處理部分任務。我只是手動做了。我通過'for_each'獲得了顯着的提速。 – Yakk

+0

'QtConcurrent'將每個操作分配給一個線程(考慮到最大併發線程數)。它不會將它們分組。請注意,如果它們可能沒有花費相同的時間,則不能直接對組進行操作。 – m7913d

相關問題