0

我已經在C++中實現了一個多線程合併排序,但是我碰到了一堵牆。如何在多線程時停止用盡堆棧空間?

在我的實現,我遞歸地拆分輸入向量分成兩個部分,然後線程這兩個部分:

void MergeSort(vector<int> *in) 
{ 
if(in->size() < 2) 
    return; 

vector<int>::iterator ite = in->begin(); 
vector<int> left = vector<int> (ite, ite + in->size()/2); 
vector<int> right = vector<int> (ite + in->size()/2, in->end()); 

//current thread spawns 2 threads HERE 
thread t1 = thread(MergeSort, &left); 
thread t2 = thread(MergeSort, &right); 

t1.join(); 
t2.join(); 

vector<int> ret; 
ret.reserve(in->size()); 

ret = MergeSortMerge(left, right); 

in->clear(); 
in->insert(in->begin(), ret.begin(), ret.end()); 

return; 
} 

的代碼看起來是很漂亮,但它是最惡毒的代碼我有一個曾經寫過。試圖排序一個超過1000個int值的數組導致這麼多的線程產生,我用完堆棧空間,我的電腦BSOD :(一致。很多線程,這不是很好,但技術上(如果不是理論上),這是不是一個適當的實施?

基於一點谷歌搜索,我似乎已經發現需要一個線程池。線程池解決我遇到的基本問題,我試圖產生太多的線程的事實?如果是這樣,你有任何關於庫的建議?

謝謝你的建議和幫助!

+4

使用線程的一個原因是使用計算機中的所有內核。在這種情況下,創建比核心更多的線程沒有任何價值。 –

+1

即使這個工作,它會很慢,遞歸地產生線程肯定聽起來像是一場災難的祕訣。正如Brian所提到的,一個想法是將線程限制爲'std :: thread :: hardware_concurrency'的值,但即使如此,我也不確定這會比'std :: sort'更快。 – user2802841

+0

你不應該能夠從這引起BSOD ......什麼操作系統?假設Windows ...什麼版本? (對於我自己的好奇心,我同意已發佈的答案) – TypeIA

回答

1

由於zdan解釋,你要限制的線程數。有兩點需要考慮,以確定什麼是極限,

  1. CPU核心的數量。在C++ 11中,您可以使用std::thread::hardware_concurrency()來確定硬件核心。然而,這個函數可能返回0意味着程序不知道有多少核心,在這種情況下,你可能會認爲這個值是2或4.

  2. 受限於被處理數據的數量。您可以將要由線程處理的數據劃分爲每個線程1個數據,但只有1個數據會花費太多,並且不具有成本效益。例如,你可以說,當數據的數量小於50時,你不想再劃分。因此,您可以根據total_data_number/50 + 1之類的內容確定所需的最大線程數。

然後,您選擇案例1中的最小數字&案例2來確定限制。

在你的情況,因爲你是遞歸生成的線程,你可以嘗試確定以類似的方式遞歸深度。

+0

啊,所以我會手動設置遞歸深度的數量,然後讓每個線程使用盡可能多的數據。謝謝你的解釋,這非常有幫助。 – lululoo

1

我不認爲一個線程池是要幫助你。由於您的算法是遞歸的,因此您將得到池中所有線程都被佔用的點,並且池不會再創建任何線程,並且算法會被阻塞。

你很可能只是限制你的線程創建遞歸深度爲2或3(除非你有CPU的的。它不會讓任何性能上的差異)。

+0

我看到了...所以我的設計必須考慮到我的線程數量有限,因爲我試圖利用多個線程。 T_T 其實,你知道我會怎麼去限制這種計算的遞歸深度嗎? – lululoo

0

你可以設置你的堆棧空間的限制,但它是徒勞的。線程太多,即使有一個池,也會以log2(N)*每線程成本將其吞掉。尋找迭代方法並減少開銷。開銷是殺手。 就性能而言,您會發現使用N個線程的過度提交的某個級別,其中硬件併發性可能會產生最佳結果。開銷和每核心工作之間會有很好的平衡。如果N非常大,比如在GPU上,那麼存在其他選項(Bitonic),這些選項會進行不同的折衷以減少通信(等待/連接)開銷。

假設你有一個任務管理器,是池莉構建N個允許通過等待任務之前通知信號, `

#include <algorithm> 
#include <array> 
#include <cstdint> 
#include <vector> 
#include <sometaskmanager.h> 

void parallel_merge(size_t N) { 
    std::array<int, 1000> ary {0}; 
    // fill array... 
    intmax_t stride_size = ary.size()/N; //TODO: Put a MIN size here 
    auto semaphore = make_semaphore(N); 
    using iterator = typename std::array<int, 1000>::iterator; 
    std::vector<std::pair<iterator, iterator>> ranges; 
    auto last_it = ary.begin(); 
    for(intmax_t n=stride_size; n<N; n +=stride_size ) { 
     ranges.emplace_back(last_it, std::next(last_it, std::min(std::distance(last_it, ary.end()), stride_size))); 
     semaphore.notify(); 
    } 
    for(auto const & rng: ranges) { 
     add_task([&semaphore,rng]() { 
     std::sort(rng.first, rng.second); 
     }); 
    } 
    semaphore.wait(); 
    std::vector<std::pair<iterator, iterator>> new_rng; 
    while(ranges.size() > 1) { 
     semaphore = make_semaphore(ranges.size()/2); 
     for(size_t n=0; n<ranges.size(); n+=2) { 
      auto first=ranges[n].first; 
      auto last=ranges[n+1].second; 
      add_task([&semaphore, first, mid=ranges[n].second, last]() { 
       std::inplace_merge(first, mid, last); 
       semaphore.notify(); 
      }); 
      new_rng.emplace_back(first, last); 
     } 
     if(ranges.size() % 2 != 0) { 
      new_rng.push_back(ranges.back()); 
     } 
     ranges = new_rng; 
     semaphore.wait(); 
    } 
} 

正如你所看到的,瓶頸是在合併階段的有必須做很多協調工作。 Sean Parent做了一個很好的介紹,如果你沒有一個任務管理器,並且在他的演示文稿Better Code:Concurrency,http://sean-parent.stlab.cc/presentations/2016-11-16-concurrency/2016-11-16-concurrency.pdf的演示文稿中介紹了相關的性能分析, TBB和PPL有任務經理。