2012-08-10 52 views
3

我想以多線程方式實現分支和邊界搜索。特別是,我想使用async在每個分支中包裝搜索調用,然後等待某個線程出現答案,然後退出。 (理想情況下,我想取消其他線程,但線程取消不在標準中)。這裏有一些簡化代碼:使用C++中的futures,異步和線程實現搜索11

#include <iostream> 
#include <random> 
#include <future> 
#include <thread> 

using namespace std; 

mt19937 rng; 
uniform_int_distribution<unsigned> random_binary(0, 1); 

bool search() { 
    return static_cast<bool>(random_binary(rng)); 
} 

#define N 10000 

int main() 
{ 
    rng.seed(42); 

    std::vector<future<bool>> tasks; 

    for (unsigned i=0; i<N; ++i) 
    tasks.push_back(async(launch::async, search)); 

    // Don't want to wait sequentially here. 
    for (unsigned i=0; i<N; ++i) { 
    tasks[i].wait(); 
    if (tasks[i].get()) { 
     cout << "i = " << i << "\n"; 
     break; 
    } 
    } 
    return 0; 
} 

search()是搜索功能。它根據是否找到答案返回true/false。爲了說明,我返回一個隨機答案。但問題的關鍵在於調用tasks[i].wait()的for循環。現在,我正在順序地等待任務完成。相反,我想要做這樣的事情:

auto x = wait_for_any(tasks.begin(), tasks.end()); 
x.get(); 
// cancel other threads. 
// Profit? 

什麼是實現這一目標的好方法?

+0

順便說一句,你在UB-土地已經來了由具有多線程調用'random_binary(RNG)',這是_not_線程安全的。 – ildjarn 2012-08-11 02:55:36

回答

8

std::future提供了一個valid()函數,可以讓您檢查結果是否可用而不會阻塞,因此您可以使用該函數,例如,在繁忙的等待循環:

std::future<bool>* res_future = 0; 
for(size_t i = 0; ; i==tasks.size()?i=0:++i){ 
    // could add a timeout period to not completely busy-wait the CPU 
    if(tasks[i].wait_for(std::chrono::seconds(0)) == std::future_status::ready){ 
    res = &tasks[i]; 
    break; 
    } 
} 

bool res = res_future->get(); 

擬議除了std::future,做出這樣的任務更容易,是一個.then(func_obj)方法異步調用func_obj當結果是可用的,在那裏你可以設置標誌或東西。

我遺憾地不知道有一種方法可能以上述以外的其他方式實施wait_for_any。 :/

template<class FwdIt> 
std::future<bool> wait_for_any(FwdIt first, FwdIt last) 
{ 
    return std::async([=]{ 
    for(FwdIt cur(first); ; cur==last?cur=first:++cur){ 
    // could add a timeout period to not completely busy-wait the CPU 
    if(cur->wait_for(std::chrono::seconds(0)) == std::future_status::ready) 
     return cur->get(); 
    }); 
} 

線程破壞通常通過協作取消來完成。

體育:std::future<T>::get()將自動wait()如果結果不可用。

+0

你的代碼比我的代碼好得多,但它仍然按順序同步輪詢每個線程。我想知道是否有更多的異步解決方案。 – keveman 2012-08-10 22:55:41

+2

@keveman:我想你可能需要放棄'std :: future' /'std :: async',並用'std :: condition_variable'把自己放在一起。 – Xeo 2012-08-10 23:01:14

+0

@keveman:看看編輯,它現在「更加異步」,因爲主線程在請求​​之前不會被阻塞。 ;) – Xeo 2012-08-10 23:20:58

6

安排所有任務訪問相同的condition_variable,mutexbool。這可以通過使這些全局變量或成員數據在每個任務運行成員函數的情況下完成,或者可以通過std::ref作爲參數在任務創建函數中傳遞它們。

在開始任何任務之前將bool初始化爲not_found。主線程然後啓動任務並等待condition_variable。搜索者任務然後搜索。當他們搜索時,他們偶爾會檢查bool(可能是原子負載),看看它是否已設置爲found。如果有,搜索器線程立即返回。

當一個線程找到結果時,它將bool設置爲found並指示condition_variable。這將喚醒主線程並有效地取消其餘的搜索任務。然後主線程可以加入,分離,放棄任何搜索任務。如果您沒有主要顯式加入搜索任務,那麼最好是如果您可以安排所有搜索任務在主要退出之前結束。

沒有輪詢。沒有等待死衚衕的搜索。唯一特別的部分是弄清楚搜索者任務如何以及經常檢查bool。我建議性能測試這部分。

+0

檢查'bool'部件對我的用例是一種侵入性。我從單線程代碼開始,但想要並行地在分支上運行搜索。我猜這在嘗試並行化現有代碼庫時並不罕見。當然,我會咬緊牙關,做一些侵入式的修改,但是我希望我有thread :: cancel。 – keveman 2012-08-10 23:25:55

+0

@keveman:不,你沒有。在沒有線程合作的情況下取消線程是沒有意義的。如果它鎖定了一個互斥體並且因爲殺死它而無法解鎖它呢?這就是合作取消的目的。 – Xeo 2012-08-10 23:29:43

+0

@Xeo再次看看我的用例,我從順序代碼開始,沒有互斥或任何這樣的東西。我同意,thread :: cancel可能會導致死鎖等,因爲被取消的線程可能處於任何狀態,應該小心使用。不過,我只是認爲thread :: cancel對於移植某些類的順序程序很有用。 – keveman 2012-08-10 23:43:55

0

我會採取的方法是讓主函數創建一個std::promise,然後將其作爲參考進行工作的各個線程。然後,每個工作線程將使用他們共享的std::promise在他們得到結果後使用set_value()

哪個工作線程首先到達那裏將能夠發送結果,其他人將在嘗試使用set_value()時得到異常,並且它已被設置。

在下面的源代碼框架中,我使用的是std::async,但如果我們可以在不創建額外的std::promise而不用任何東西的情況下剝離自治線程,那將會很好。

因此代碼框架看起來是這樣的:

#include <iostream> 
#include <thread> // std::thread is defined here 
#include <future> // std::future and std::promise defined here 

bool search(int args, std::promise<int> &p) { 

    // do whatever needs to be done 

    try { 
     p.set_value(args); 
    } 
    catch (std::future_error & e) { 
     // std::future_errc::promise_already_satisfied 
    } 
    return true; 
} 
int main(int argc, char * argv[]) 
{ 
    std::promise<int> myProm;  
    auto fut = myProm.get_future(); 

    for (int i = 1; i < 4; i++) { 
     std::async(std::launch::async, search, i, std::ref(myProm)); 
    } 

    auto retVal = fut.get(); 
    return 0; 
}