2016-11-20 75 views
1

我有一個可變大小的大向量。我想檢查每個元素(在矢量的特定索引範圍lowerRange-upperRange內)是否滿足某個條件?在下面的例子中,我的輸入向量包含9個元素,我想檢查從2到6的元素是否滿足check_if_condition()。在這裏,lowerRange = 2和upperRange = 6在向量C++中並行化搜索

爲此,我寫了下面的並行代碼來做同樣的事情,但是,這個代碼的問題是它給出了錯誤:「glibc detect smallbin linked list corrupted」。我嘗試使用valgrind調試代碼,但我無法找到錯誤的確切原因。

我的實際的實際輸入向量包含10000000個元素,我想檢查999999(lowerRange)-9999999(upperRange)之間的元素(這個範圍是由用戶指定的,儘管我已經把這個範圍當作代碼中的常量)索引元素滿足check_if_condition

#include <thread> 
#include <vector> 
#include <iostream> 
#include <atomic> 

unsigned check_if_condition(int a) 
{ 
    //Long check here 
    return 1; 
} 

void doWork(std::vector<unsigned>& input, std::vector<unsigned>& results, unsigned assigned, size_t current, size_t end, std::atomic_int& totalPassed) 
{ 
    end = std::min(end, input.size()-2); 
    int numPassed = 0;  
    for(; (current) < end; ++current) { 
     if(check_if_condition(input[current])) { 
      results[current] = true; 
      ++numPassed; 
     } 
    } 

    totalPassed.fetch_add(numPassed); 
} 

int main() 
{ 
    std::vector<unsigned> input;//(1000000); 
    input.push_back(0); input.push_back(1); input.push_back(2); input.push_back(3); input.push_back(4); input.push_back(5); input.push_back(6); input.push_back(7); input.push_back(8); 
    std::vector<unsigned> results(input.size()); 
    std::atomic_int numPassed(0);   
    auto numThreads = std::thread::hardware_concurrency();  
    std::vector<std::thread> threads; 
    unsigned assigned; 

    if(numThreads> input.size()) 
     numThreads=input.size(); 
    std::cout<<"numThreads="<<numThreads<<"\n"; 
    auto blockSize = input.size()/numThreads; 
    for(size_t i = 0; i < numThreads - 1; ++i) //check whether elements from 2 to 6 satisfy check_if_condition 
     threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned,((i+2) * blockSize), ((i+3) * blockSize), std::ref(numPassed)); 


    for(auto& thread : threads) 
     thread.join(); 


    std::vector<int> storage; 
    storage.reserve(numPassed.load()); 

    auto itRes = results.begin(); 
    auto itInput = input.begin(); 
    auto endRes = results.end(); 
    for(; itRes != endRes; ++itRes, ++itInput) { 
     if(*itRes) 
      storage.emplace_back(*itInput);    
    } 

    std::cout<<"\n Storage:"; 
    for(std::vector<int>::iterator i1=storage.begin(), l1=storage.end(); i1!=l1; ++i1) 
     std::cout<<" "<<(*i1)<<"\n"; 

    std::cout << "Done" << std::endl; 
} 
+0

您需要確保在向量子範圍上工作的努力不超過向量子範圍本身的工作。使用10個線程和1000萬個元素,每個線程將有100萬個元素;如果工作是每個元素幾十個機器指令,那麼每個線程都會有1億條指令,這應該足夠多。如果你只有10萬個元素,那麼你可能沒有足夠的資源來完成這項工作,這將是一個放緩的過程。真的世界應用程序中真的有1000萬個元素嗎? –

回答

3

要檢查end但不currentdoWork,因此你對

for(size_t i = 0; i < numThreads - 1; ++i) //check whether elements from 2 to 6 satisfy check_if_condition 
     threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned,((i+2) * blockSize), ((i+3) * blockSize), std::ref(numPassed)); 

你的最後一次迭代閱讀過去的矢量比方說你的載體是1000元大,你的線程數是8,在最後一次迭代中,您將獲得:

i = 7;

current =(7 + 2)* 125 = 1125;

end =(7 + 3)* 125 = 1250;


所以要均勻地分配工作的線程之間對於給定的子範圍[rangeStart,則rangeEnd的),你需要執行下面的循環:

for(size_t i = 0; i < numThreads; ++i) 
{ 
    auto start = rangeStart + i * blockSize; 
    auto end = (i == numThreads - 1) ? rangeEnd : start + (i+1) * blockSize; 
    threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned, start, end, std::ref(numPassed)); 
} 

注意,在最後一次迭代end直接設置爲rangeEnd的使最後一個線程可能略有更多的工作要做

此外,塊大小應調整:

auto blockSize = (rangeEnd - rangeStart)/numThreads; 
+0

ok..thanks很多...我如何修復它 –

+1

首先所有的塊大小應(rangeEnd的 - 具有rangeStart)/ numOf線程 –

+0

二的所有電流='範圍+ I * blockSize'和'結束=啓動+(1 + 1)* blockSize' –