2012-05-21 43 views
6

我發現自己經常寫形式的C++代碼:慣用方式並行跨文件系函數C++

while (getline(strm, line)) { 
    cout << computationally_intensive_function(line) << endl; 
} 

我想這個並行代碼。我想出了迄今爲止最好的解決方案是構建字符串矢量持有行的大(10000-100000)號碼,然後並行在此向量

#pragma omp parallel for 

然後清空載體和重複而線仍然存在。但是,此方法需要大量內存,其他內核處於空閒狀態,而主進程正在緩衝字符串。有沒有更好的辦法?像Python的multiprocessing.Pool.map或Hadoop? (我想避免但使用Hadoop的C++ API,因爲Hadoop是相當重量級,不得隨處安裝我的代碼將被運行。)

+2

如果你有訪問C++ 11編譯器,你可以使用'std :: async'或者用'std :: packaged_task'實現你自己的線程池。 – xDD

+0

如果每一行都是獨立的,並且您的文件可用,那麼爲什麼不提前將其切斷,並將其提供給此過程的多個實例? –

+0

冗長的故事,你需要潛在的不止一個核心緩衝字符串。您需要能夠平衡內核之間的負載,以便儘可能多地執行計算密集型功能,而緩衝中的線程也將全速運行。這不是一個微不足道的答案 –

回答

5

有存在不爲人所熟知的OpenMP 3.0 任務,這是特色相當不幸,因爲它們是專門爲此類案件設計的。如果您的編譯器支持該標準版本,那麼您肯定應該參加OpenMP任務。但請記住,寫從多個線程stdout(或std::cout)一般混合它們的輸出不好,你最有可能想在它同步:

#pragma omp parallel 
{ 
    #pragma omp master 
    while (getline(strm, line)) 
    #pragma omp task 
    { 
     result_type result = computationally_intensive_function(line); 
     #pragma omp critical 
     { 
      cout << result << endl; 
      cout.flush(); 
     } 
    } 
    #pragma omp taskwait 
} 

我把它留給你來決定哪些變量應該是shared和什麼應該是private

+0

正是我需要的!謝謝!! – gilesc

1

您應該將您的計算與文件中的讀取線重疊。一個好的方法是使用線程構建模塊管道算法。你所做的是你指定三個(根據你在僞代碼示例中顯示的內容)過濾器,兩個串行一個和一個並行。串行過濾器是輸入和輸出過濾器。第一種方法是逐行從文件中讀取數據,並將每行傳遞給第二個過濾器,該過濾器是並行的,並以多線程模式運行計算/處理功能。最後一個階段/過濾器也是串行的,它輸出。我的複製粘貼從TBB教程,這似乎是在做正是你想要達到什麼樣的一個例子:

// Holds a slice of text. 
/** Instances *must* be allocated/freed using methods herein, because the 
C++ declaration 
represents only the header of a much larger object in memory. */ 
class TextSlice { 
    // Pointer to one past last character in sequence 
    char* logical_end; 
    // Pointer to one past last available byte in sequence. 
    char* physical_end; 
public: 
    // Allocate a TextSlice object that can hold up to max_size characters. 
    static TextSlice* allocate(size_t max_size) { 
     // +1 leaves room for a terminating null character. 
     TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1); 
     t->logical_end = t->begin(); 
     t->physical_end = t->begin()+max_size; 
     return t; 
    } 
    // Free this TextSlice object 
    void free() { 
     tbb::tbb_allocator<char>().deallocate((char*)this, 
     sizeof(TextSlice)+(physical_end-begin())+1); 
    } 
    // Pointer to beginning of sequence 
    char* begin() {return (char*)(this+1);} 
    // Pointer to one past last character in sequence 
    char* end() {return logical_end;} 
    // Length of sequence 
    size_t size() const {return logical_end-(char*)(this+1);} 
    // Maximum number of characters that can be appended to sequence 
    size_t avail() const {return physical_end-logical_end;} 
    // Append sequence [first,last) to this sequence. 
    void append(char* first, char* last) { 
     memcpy(logical_end, first, last-first); 
     logical_end += last-first; 
    } 
    // Set end() to given value. 
    void set_end(char* p) {logical_end=p;} 
}; 

和函數來得到這個運行是:

void RunPipeline(int ntoken, FILE* input_file, FILE* output_file) { 
    tbb::parallel_pipeline(
    ntoken, 
    tbb::make_filter<void,TextSlice*>(
    tbb::filter::serial_in_order, MyInputFunc(input_file)) 
    & 
    tbb::make_filter<TextSlice*,TextSlice*>(
    tbb::filter::parallel, MyTransformFunc()) 
    & 
    tbb::make_filter<TextSlice*,void>(
    tbb::filter::serial_in_order, MyOutputFunc(output_file))); 
}