2013-08-06 115 views
1

我已經看到了很多這樣的帖子,儘管做了很多的閱讀,但我似乎無法獲得以下代碼來在OpenMP中正確並行化,因爲串行版本當前運行得非常多比這更快:OpenMP並行化代碼運行速度較慢

static double red_black_parallel_for_step(simulation* simObj, double stepSize, double* red, double* black){ 
double tmp = 0.0; 
double avg = 0.0; 
double old = 0.0; 
double max = -HUGE_VAL; 
#pragma omp parallel \ 
shared(black, red, max) \ 
firstprivate(old, avg, tmp) 
{ 
    double priv_max = -HUGE_VAL; 
    #pragma omp for 
    for(unsigned int j = 0; j < (*simObj).NY+2; j++){ 
     for(unsigned int i = 0; i < (int)floor((double)((*simObj).NX+2.0)/2.0); i++){ 
      for(unsigned int k = 1; k < (*simObj).NZ; k++){ 
       if(red[IX3] == HUGE_VAL) continue; 
       old = red[IX3]; 
       avg = 0.0; 
       const int x1 = (black[IX3+IX3_XR1STEP] != HUGE_VAL); 
       const int x2 = (black[IX3+IX3_XR2STEP] != HUGE_VAL); 
       const int y1 = (black[IX3+IX3_YSTEP] != HUGE_VAL); 
       const int y2 = (black[IX3-IX3_YSTEP] != HUGE_VAL); 
       const int z1 = (black[IX3+IX3_ZSTEP] != HUGE_VAL); 
       const int z2 = (black[IX3-IX3_ZSTEP] != HUGE_VAL); 
       if (x1) avg += black[IX3+IX3_XR1STEP]; 
       if (x2) avg += black[IX3+IX3_XR2STEP]; 
       if (y1) avg += black[IX3+IX3_YSTEP]; 
       if (y2) avg += black[IX3-IX3_YSTEP]; 
       if (z1) avg += black[IX3+IX3_ZSTEP]; 
       if (z2) avg += black[IX3-IX3_ZSTEP]; 
       avg /= (double) (x1+x2+y1+y2+z1+z2); 
       red[IX3] = old + stepSize * (avg - old); 
       tmp = fabs(old - red[IX3])/fabs(old); 
       if(tmp > priv_max) priv_max = tmp; 
      } 
     } 
    } 
    #pragma omp flush (max) 
    if (priv_max > max) { 
     #pragma omp critical 
     { 
      if (priv_max > max) max = priv_max; 
     } 
    } 
} 
#pragma omp parallel \ 
shared(black, red, max) \ 
firstprivate(old, avg, tmp) 
{ 
    double priv_max = -HUGE_VAL; 
    #pragma omp for 
    for(unsigned int j = 0; j < (*simObj).NY+2; j++){ 
     for(unsigned int i = 0; i < (int)floor((double)((*simObj).NX+2)/2.0); i++){ 
      for (unsigned int k = 1; k < (*simObj).NZ; k++){ 
       if (black[IX3] == HUGE_VAL) continue; 
       old = black[IX3]; 
       avg = 0.0; 
       const int x1 = (red[IX3+IX3_XB1STEP] != HUGE_VAL); 
       const int x2 = (red[IX3+IX3_XB2STEP] != HUGE_VAL); 
       const int y1 = (red[IX3+IX3_YSTEP] != HUGE_VAL); 
       const int y2 = (red[IX3-IX3_YSTEP] != HUGE_VAL); 
       const int z1 = (red[IX3+IX3_ZSTEP] != HUGE_VAL); 
       const int z2 = (red[IX3-IX3_ZSTEP] != HUGE_VAL); 
       if (x1) avg += red[IX3+IX3_XB1STEP]; 
       if (x2) avg += red[IX3+IX3_XB2STEP]; 
       if (y1) avg += red[IX3+IX3_YSTEP]; 
       if (y2) avg += red[IX3-IX3_YSTEP]; 
       if (z1) avg += red[IX3+IX3_ZSTEP]; 
       if (z2) avg += red[IX3-IX3_ZSTEP]; 
       avg /= (double) (x1+x2+y1+y2+z1+z2); 
       black[IX3] = old + stepSize * (avg - old); 
       tmp = fabs(old - black[IX3])/fabs(old); 
       if(tmp > priv_max) priv_max = tmp; 
      } 
     } 
    } 
    #pragma omp flush (max) 
    if (priv_max > max) { 
     #pragma omp critical 
     { 
      if (priv_max > max) max = priv_max; 
     } 
    } 
} 
return max; 
} 

複雜的因素是我需要跟蹤紅色/黑色迭代之間的最大相對變化(最大)。任何幫助將非常感激。

+0

不讀你的代碼你prolly其是多處理器工作時,你必須將數據複製到異地的問題,許多時間將超過並行化的收益 – aaronman

+0

我只是將'priv_max'的值分別存儲在所有迭代中,然後在最後找到這些值的最大值。如果你有太多的迭代,你仍然需要/想要並行執行,你只需要幾步就可以完成 - 例如,將'priv_max'的數組分成N個部分。並行查找每個片段的最大值,然後按順序查找這些N的最大值。對於'N',你可能想調用'omp_get_max_threads()'。 –

+0

謝謝@JerryCoffin。正如你所看到的,每個線程都擁有它自己的priv_max版本,並且在最後一個(外部)循環之後,它們被調整以選擇最大值,並將最大值分配給哪個最大值。只有7次調用此功能時,並行版本需要約45秒,並在17秒內連續運行。循環迭代的總數大約爲10^7。我認爲不同的priv_max可以解決這個問題。 – Ryan

回答

0

嘗試僅僅比較之後,並且只在關鍵塊沖洗:

/* not here: #pragma omp flush (max) */ 
if (priv_max > max) { // this should filter out most of the flush operations 
    #pragma omp critical 
    { 
     if (priv_max > max) max = priv_max; // now flush; this operation will be exclusive/"critical" 
     #pragma omp flush (max) 
    } 
}