2013-05-29 64 views
0

我發現openmp不支持while循環(或者至少不喜歡它們太多)。 也不喜歡'!='操作符。openmp - while循環文本文件讀取和使用管道

我有這一點的代碼。

int count = 1; 
#pragma omp parallel for 
    while (fgets(buff, BUFF_SIZE, f) != NULL) 
    { 
     len = strlen(buff); 
     int sequence_counter = segment_read(buff,len,count); 
     if (sequence_counter == 1) 
     { 
      count_of_reads++; 
      printf("\n Total No. of reads: %d \n",count_of_reads); 
     } 
    count++; 
    } 

任何線索,如何管理?我讀了一個地方(包括stackoverflow的另一篇文章),我可以使用管道。那是什麼 ?以及如何實施它?

+0

編譯提供了一個鏈接到計算器後你看的,請。 – Shahbaz

+1

@Shahbaz,我想他可能是指該SO發佈 http://stackoverflow.com/questions/8121077/fread-slow-performance-in-openmp-threads – 2013-05-30 08:29:47

+0

其實...這一個.. HTTP ://stackoverflow.com/questions/7532067/parallelize-while-loop-with-openmp 但這也是相關的! – Sid5427

回答

3

在OpenMP中實現「parallel while」的一種方法是使用創建任務的while循環。這是一個普遍的草圖:

void foo() { 
    while(Foo* f = get_next_thing()) { 
#pragma omp task firstprivate(f) 
     bar(f); 
    } 
#pragma omp taskwait 
} 

對於遍歷與fgets的具體情況,注意與fgets具有固有的順序語義(它獲得了「下一個」行),這樣就需要啓動任務之前調用。對於每個任務來說,對fgets返回的數據副本進行操作也很重要,這樣對fgets的調用不會覆蓋前一個任務所操作的緩衝區。

+0

這些任務可以調用函數嗎?即調用函數的任務,在char數組上執行函數,並返回一個值? – Sid5427

+0

是的,任務可以調用函數。你必須小心的是確保沒有兩個併發任務互相干擾。例如,如果我忘記在我的例子中寫下「firstprivate(f),那麼當bar(f)開始實際啓動時,f的值可能已經消失或者被下一次迭代覆蓋 –

+3

'f'這是默認的firstprivate,'task'構造中的'firstprivate'子句是多餘的。 –

1

首先,即使它非常接近,但openmp不會奇蹟般地讓你的代碼並行。它適用於for,因爲for具有它可以理解的上限和下限。 Openmp使用這些界限在不同線程之間劃分工作。

while循環沒有這種可能。

其次,你如何期望你的任務是並行化的?您正在從一個文件中讀取數據,其中順序訪問可能會給您比並行訪問更好的性能。您可能會並行segment_read(基於其實施)。

或者,您可能希望將文件讀取與處理重疊。爲此,您需要使用更多的低級函數,例如Unix的openread函數。然後,執行異步讀取,即發送讀取請求,處理最後一個讀取塊,然後等待讀取請求完成。例如,搜索「linux異步io」以閱讀更多內容。

使用管道可能實際上並沒有多大幫助。這將取決於我不熟悉的許多管道內部結構。但是,如果您擁有足夠大的內存,則可能還需要考慮先加載整個數據,然後再處理它。這樣,加載數據的速度將盡可能快(按順序),然​​後您可以並行處理它們。

10

這太糟糕了,人們很快選擇最佳答案。這是我的答案。
首先,你應該把文件讀入一個類似fread的緩衝區。這很快。如何做到這一點的例子可以在這裏找到http://www.cplusplus.com/reference/cstdio/fread/

然後,你可以在OpenMP上並行操作緩衝區。我已經爲你實施了大部分。以下是代碼。你沒有提供segment_read函數,所以我創建了一個虛擬函數。我使用了C++中的一些函數,比如std :: vector和std :: sort,但在純C中也可以做更多的工作。

編輯︰ 我編輯此代碼,並能夠刪除排序和關鍵部分。

g++ foo.cpp -o foo -fopenmp -O3

#include <stdio.h> 
#include <omp.h> 
#include <vector> 

using namespace std; 

int segment_read(char *buff, const int len, const int count) { 
    return 1; 
} 

void foo(char* buffer, size_t size) { 
    int count_of_reads = 0; 
    int count = 1; 
    std::vector<int> *posa; 
    int nthreads; 

    #pragma omp parallel 
    { 
     nthreads = omp_get_num_threads(); 
     const int ithread = omp_get_thread_num(); 
     #pragma omp single 
     { 
      posa = new vector<int>[nthreads]; 
      posa[0].push_back(0); 
     } 

     //get the number of lines and end of line position 
     #pragma omp for reduction(+: count) 
     for(int i=0; i<size; i++) { 
      if(buffer[i] == '\n') { //should add EOF as well to be safe 
       count++; 
       posa[ithread].push_back(i); 
      } 
     } 

     #pragma omp for  
     for(int i=1; i<count ;i++) {  
      const int len = posa[ithread][i] - posa[ithread][i-1]; 
      char* buff = &buffer[posa[ithread][i-1]]; 
      const int sequence_counter = segment_read(buff,len,i); 
      if (sequence_counter == 1) { 
       #pragma omp atomic 
       count_of_reads++; 
       printf("\n Total No. of reads: %d \n",count_of_reads); 
      } 

     } 
    } 
    delete[] posa; 
} 

int main() { 
    FILE * pFile; 
    long lSize; 
    char * buffer; 
    size_t result; 

    pFile = fopen ("myfile.txt" , "rb"); 
    if (pFile==NULL) {fputs ("File error",stderr); exit (1);} 

    // obtain file size: 
    fseek (pFile , 0 , SEEK_END); 
    lSize = ftell (pFile); 
    rewind (pFile); 

    // allocate memory to contain the whole file: 
    buffer = (char*) malloc (sizeof(char)*lSize); 
    if (buffer == NULL) {fputs ("Memory error",stderr); exit (2);} 

    // copy the file into the buffer: 
    result = fread (buffer,1,lSize,pFile); 
    if (result != lSize) {fputs ("Reading error",stderr); exit (3);} 

    /* the whole file is now loaded in the memory buffer. */ 
    foo(buffer, result); 
    // terminate 


    fclose (pFile); 
    free (buffer); 
    return 0; 
} 
+0

我絕對愛護這個答案。很抱歉,最好的答案是「大多數問題似乎讓他們儘快解決」。 – Sid5427

+1

我編輯過代碼(再次),線程的內部循環是錯誤的,我遇到的主要問題是線程隨機進入,所以segment_read可能不會按順序調用,這可能不是問題。消息是帶有位置的變量是有序的,換句話說,posa [0]是最低位置的向量,posa [7](使用8個線程)是位置最高的向量,所以如果你需要爲了您的位置g他們。最初,我使用sort()和一個關鍵部分來做到這一點,但最新的代碼並不需要這樣做。 – 2013-05-30 13:57:40

+0

多數民衆贊成在我這個問題最主要的是,我希望線被順序讀取,但segment_read中的一些子句可能會提前終止該線程。這會有什麼影響嗎?我的整個想法是整個segment_read函數並行運行。我有一個8核心機器,所以你可以假設8個segment_reads在文件的8個不同行中運行。 – Sid5427