2013-06-19 54 views
20

爲了給出一些背景信息,我正在處理一個保存的文件,並且在使用正則表達式將文件拆分爲它的組件對象之後,我需要根據它是哪種類型的對象來處理對象的數據。很多時候使用std :: async爲小任務性能友好?

我現在的想法是使用並行獲得的性能增益裝載的一點點每個對象都是相互獨立的。所以我要定義一個LoadObject函數接受std::string爲對象的每個類型,我將要處理,然後調用std::async如下:

void LoadFromFile(const std::string& szFileName) 
{ 
    static const std::regex regexObject("=== ([^=]+) ===\\n((?:.|\\n)*)\\n=== END \\1 ===", std::regex_constants::ECMAScript | std::regex_constants::optimize); 

    std::ifstream inFile(szFileName); 
    inFile.exceptions(std::ifstream::failbit | std::ifstream::badbit); 

    std::string szFileData((std::istreambuf_iterator<char>(inFile)), (std::istreambuf_iterator<char>())); 

    inFile.close(); 

    std::vector<std::future<void>> vecFutures; 

    for(std::sregex_iterator itObject(szFileData.cbegin(), szFileData.cend(), regexObject), end; itObject != end; ++itObject) 
    { 
      // Determine what type of object we're loading: 
      if((*itObject)[1] == "Type1") 
      { 
       vecFutures.emplace_back(std::async(LoadType1, (*itObject)[2].str())); 
      } 
      else if((*itObject)[1] == "Type2") 
      { 
       vecFutures.emplace_back(std::async(LoadType2, (*itObject)[2].str())); 
      } 
      else 
      { 
       throw std::runtime_error("Unexpected type encountered whilst reading data file."); 
      } 
    } 

    // Make sure all our tasks completed: 
    for(auto& future : vecFutures) 
    { 
      future.get(); 
    } 
} 

注意,將有超過200種應用程序(這只是一個簡短的例子),並且可能會讀取文件中的數千個對象。

我知道,創建太多的線程對於性能來說通常是一件壞事,因爲它會超出由於上下文切換而導致的最大硬件併發性,但是如果我的內存正確地服務於我,C++運行時應該監視創建的線程數並適當安排std::async(我相信微軟的情況下他們的ConcRT庫負責這個?),所以上面的代碼仍然可以提高性能?

在此先感謝!

+0

上面的代碼*可能*在事實上導致了性能的提高,但我會說,這取決於每個'LoadTypeX'正在做的工作量。是否足以超過您在主線程中發起的開銷和等待以及同步的開銷?更不用說越來越多的緩存未命中和假分支。以及與多線程編程相關的其他處罰。所以,如果你的對象很大,你的異步加載函數做了大量的工作,我會說這可能是值得的。但爲什麼你不測量? – yzt

+4

無關:您正在創建100個默認構建期貨的向量,然後在最後附加您的真實期貨。對那些未定義行爲的默認構造期貨結果調用'get()'。 – Casey

+0

你分析了你的代碼嗎?我預計I/O成本將處理成本壓縮到將處理分解爲線程的收益可能無法衡量的程度。 –

回答

14

C++運行時應該監視創建的線程和調度性病的數量::異步適當

號如果異步任務其實都是異步的(而不是推遲),那麼所有的運行需要的是它們在新線程上運行。對於每個任務創建和啓動的新線程而言,完全有效,而不考慮硬件的並行能力有限。

有一張紙條:

[注:如果該政策與其他政策,比如在使用推出的策略值時,作爲同時指定::異步| launch :: deferred, 實現應該推遲調用或選擇策略 ,否則無法有效利用更多併發。末端注]

然而,這是不規範的,並且在任何情況下,它表明,一旦沒有更多的併發性,可被利用的任務可能會變得延遲,因此,當有人在等候結果得到執行,而不是仍然是異步的並且在完成之前的異步任務之一後立即運行,這對於最大並行性是期望的。

也就是說,如果我們有10個長期運行的任務和實現只可以並行執行4,那麼第4將是異步的,而在過去6可能會被推遲。依次等待期貨將在單個線程上依次執行延期任務,從而消除這些任務的並行執行。

該音符不也說,而不是推遲調用,政策的選擇可能會被推遲。也就是說,函數仍然可以異步運行,但是這個決定可能會延遲,比如說,直到之前的任務之一完成,釋放一個新任務的核心。但是,這不是必需的,這個註釋是非規範的,據我所知,微軟的實現是唯一一個這樣表現的實現。當我查看另一個實現libC++時,它完全忽略了這個註釋,因此使用std::launch::asyncstd::launch::any策略會導致在新線程上異步執行。

(我相信在微軟的情況下,它們ConcRT庫負責呢?)

象你所說的,但是這不是必需的,可移植程序不能依靠這種行爲Microsoft的實現確實表現。

一種方法可移植限制多少線程實際上正在運行的是使用像一個信號:

#include <future> 
#include <mutex> 
#include <cstdio> 

// a semaphore class 
// 
// All threads can wait on this object. When a waiting thread 
// is woken up, it does its work and then notifies another waiting thread. 
// In this way only n threads will be be doing work at any time. 
// 
class Semaphore { 
private: 
    std::mutex m; 
    std::condition_variable cv; 
    unsigned int count; 

public: 
    Semaphore(int n) : count(n) {} 
    void notify() { 
     std::unique_lock<std::mutex> l(m); 
     ++count; 
     cv.notify_one(); 
    } 
    void wait() { 
     std::unique_lock<std::mutex> l(m); 
     cv.wait(l, [this]{ return count!=0; }); 
     --count; 
    } 
}; 

// an RAII class to handle waiting and notifying the next thread 
// Work is done between when the object is created and destroyed 
class Semaphore_waiter_notifier { 
    Semaphore &s; 
public: 
    Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); } 
    ~Semaphore_waiter_notifier() { s.notify(); } 
}; 

// some inefficient work for our threads to do 
int fib(int n) { 
    if (n<2) return n; 
    return fib(n-1) + fib(n-2); 
} 

// for_each algorithm for iterating over a container but also 
// making an integer index available. 
// 
// f is called like f(index, element) 
template<typename Container, typename F> 
F for_each(Container &c, F f) { 
    Container::size_type i = 0; 
    for (auto &e : c) 
     f(i++, e); 
    return f; 
} 

// global semaphore so that lambdas don't have to capture it 
Semaphore thread_limiter(4); 

int main() { 
    std::vector<int> input(100); 
    for_each(input, [](int i, int &e) { e = (i%10) + 35; }); 

    std::vector<std::future<int>> output; 
    for_each(input, [&output](int i, int e) { 
     output.push_back(std::async(std::launch::async, [] (int task, int n) -> int { 
      Semaphore_waiter_notifier w(thread_limiter); 
      std::printf("Starting task %d\n", task); 
      int res = fib(n); 
      std::printf("\t\t\t\t\t\tTask %d finished\n", task); 
      return res; 
     }, i, e)); 
    }); 

    for_each(output, [](int i, std::future<int> &e) { 
     std::printf("\t\t\tWaiting on task %d\n", i); 
     int res = e.get(); 
     std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res); 
    }); 
} 
+0

謝謝你的深入,簡潔的回答。然而,你是否會知道在微軟的特定情況下,使用'std :: async'創建的任務是否被延遲,直到對wait()或get()進行調用或者是否延遲到一個線程已完成? –

+0

@ Shaktal他們不會被推遲;它們在線程池上異步執行,以按照您所描述的方式使用ConcRT限制超額訂閱。 – bames53

+0

棒極了!這個解決方案是完整的,非常容易理解,並且可以修改爲真實的商業案例。在搜索C++多線程教程幾個月後,我很幸運地找到這篇文章。你能推薦我應該在哪裏閱讀更多(書/網頁/視頻)嗎? XD – cppBeginner