2011-07-11 171 views
2

這個問題是關於在C++中實現以下模擬的最佳策略。什麼是更好的實施策略?

我試圖做一個模擬作爲一個物理研究項目的一部分,它基本上追蹤空間節點鏈的動態。每個節點都包含一個位置以及某些參數(局部曲率,速度,到鄰居的距離等),這些參數都會隨時間演化。

每次步驟可以被分解到這四個部分:

  • 計算局部參數。值取決於鏈中最近的鄰居。
  • 計算全局參數。
  • 不斷髮展。根據全局和本地參數以及一些隨機力場,每個節點的位置移動很少。
  • 填充。如果兩個連續節點之間的距離達到臨界值,則插入新節點。

(此外,節點可以被卡住,這使他們處於非活動狀態的模擬的其餘部分。與不活躍的鄰居不活動節點的本地參數,不會改變,並且不需要任何更多的計算。)

每個節點包含〜60個字節,我在鏈中有〜100 000個節點,我需要發展大約1 000 000個時間步長的鏈。不過,我希望最大限度地提高這些數字,因爲這會增加我的模擬的準確性,但是在合理的時間(〜小時)內完成模擬的限制。 (約30%的節點將處於非活動狀態)。

我已經開始在C++中將此模擬實現爲雙向鏈表。這看起來很自然,因爲我需要在現有節點之間插入新節點,並且因爲本地參數取決於最近的鄰居。 (我添加了一個額外的指針到下一個活動節點,以避免不必要的計算,每當我遍歷整個鏈時)。我並不是專家,當談到並行化(或編碼)時,我已經玩過OpenMP,我真的很喜歡我如何通過兩行代碼加速獨立操作的循環。我不知道如何讓我的鏈表並行執行,或者它甚至可以工作(?)。所以我有了使用stl向量的想法。在哪裏沒有指向最近鄰居的指針,我可以存儲鄰居的索引並通過標準查找訪問它們。我還可以通過鏈的位置(每第x個時間步)對矢量進行排序,以獲得更好的內存位置。這種方法將允許循環OpenMP方式。

我有點害怕這個想法,因爲我不必處理內存管理。我猜想stl向量的實現比我在列表中處理節點的簡單的'new'和'delete'方式要好得多。我知道我可以用stl list做同樣的事情,但我不喜歡用迭代器訪問最近鄰居的方式。

所以我問你,1337 h4x0r和熟練的程序員,我的模擬會是什麼樣的更好的設計?矢量方法是否勾畫出一個好主意?或者有什麼技巧可以在鏈接列表上播放以使其與OpenMP一起使用?或者我應該考慮一個完全不同的方法?

模擬將運行在8核心和48G內存的計算機上,所以我想我可以交易很多內存的速度。

在此先感謝

編輯: 我需要添加1-2%的新節點每個時間段,所以其存儲的載體,而不指數以最近的鄰居,除非我排序的載體將無法正常工作每一步。

+0

您的本地參數如何本地化,以及如何計算全局屬性? –

+0

@Jonathan Dursi:局部參數是局部的,因爲它們只依賴鏈中最近的鄰居。例如:局部曲率通過擬合三個點(一個節點和它的鄰居)到一個圓來近似。全局參數的例子是鏈長度,連續節點之間的距離之和。這是否回答你的問題? – jonalm

+1

我想我會提及 - 如果你沒有遇到它 - gnu_parallel擴展(MSVC有類似的)。基本上std :: library在後臺與openmp並行。如果你用stl編寫你的循環(例如for_each,accumulate,inner_product等) - 那麼你可以先編寫你的代碼的序列版本(以便正確模擬仿真),然後在幾乎平行的情況下對它進行平行處理(幾乎是因爲你仍然需要讓你的容器 - 但是你實現它們 - 線程安全)。我發現它很有幫助。 http://gcc.gnu.org/onlinedocs/libstdc++/manual/parallel_mode.html – Tom

回答

2

這是一個經典的權衡問題。使用數組或std :: vector將使計算速度更快,插入速度更慢;使用雙向鏈表或std :: list將使插入速度更快,計算速度更慢。

判斷折衷問題的唯一方法是經驗性的;這對你的特定應用程序來說會更快?你所能做的只是嘗試雙方的看法。計算越緊密,模板越短(例如,計算強度 - 必須爲每個內存量帶來多少觸發器),標準數組就越不重要。但基本上你應該用兩種方式來模擬你的基本計算的實現,看看它是否重要。我一起砍了一個非常原油去東西與std :: vector和std :: list;這在很多方面都可能是錯誤的,但是你可以放棄一些參數並看看哪些參數取勝。在我給出的計算大小和數量的系統上,列表速度更快,但它可以很容易地以任何一種方式進行。

W/rt openmp,是的,如果這就是你要去的方式,你的雙手有點束縛;你幾乎可以肯定必須使用矢量結構,但首先你應該確保插入的額外成本不會消除任何多核的好處。

#include <iostream> 
#include <list> 
#include <vector> 
#include <cmath> 
#include <sys/time.h> 
using namespace std; 

struct node { 
    bool stuck; 
    double x[2]; 
    double loccurve; 
    double disttoprev; 
}; 

void tick(struct timeval *t) { 
    gettimeofday(t, NULL); 
} 

/* returns time in seconds from now to time described by t */ 
double tock(struct timeval *t) { 
    struct timeval now; 
    gettimeofday(&now, NULL); 
    return (double)(now.tv_sec - t->tv_sec) + 
     ((double)(now.tv_usec - t->tv_usec)/1000000.); 
} 

int main() 
{ 
    const int nstart = 100; 
    const int niters = 100; 
    const int nevery = 30; 
    const bool doPrint = false; 
    list<struct node> nodelist; 
    vector<struct node> nodevect; 

    // Note - vector is *much* faster if you know ahead of time 
    // maximum size of vector 
    nodevect.reserve(nstart*30); 

    // Initialize 
    for (int i = 0; i < nstart; i++) { 
     struct node *mynode = new struct node; 
     mynode->stuck = false; 
     mynode->x[0] = i; mynode->x[1] = 2.*i; 
     mynode->loccurve = -1; 
     mynode->disttoprev = -1; 

     nodelist.push_back(*mynode); 
     nodevect.push_back(*mynode); 
    } 

    const double EPSILON = 1.e-6; 
    struct timeval listclock; 
    double listtime; 

    tick(&listclock); 
    for (int i=0; i<niters; i++) { 
     // Calculate local curvature, distance 

     list<struct node>::iterator prev, next, cur; 
     double dx1, dx2, dy1, dy2; 

     next = cur = prev = nodelist.begin(); 
     cur++; 
     next++; next++; 
     dx1 = prev->x[0]-cur->x[0]; 
     dy1 = prev->x[1]-cur->x[1]; 

     while (next != nodelist.end()) { 
      dx2 = cur->x[0]-next->x[0]; 
      dy2 = cur->x[1]-next->x[1]; 

      double slope1 = (dy1/(dx1+EPSILON)); 
      double slope2 = (dy2/(dx2+EPSILON)); 

      cur->disttoprev = sqrt(dx1*dx1 + dx2*dx2); 

      cur->loccurve = (slope1*slope2*(dy1+dy2) + 
           slope2*(prev->x[0]+cur->x[0]) - 
           slope1*(cur->x[0] +next->x[0]))/
          (2.*(slope2-slope1) + EPSILON); 

      next++; 
      cur++; 
      prev++; 
     } 

     // Insert interpolated pt every neveryth pt 
     int count = 1; 
     next = cur = nodelist.begin(); 
     next++; 
     while (next != nodelist.end()) { 
      if (count % nevery == 0) { 
       struct node *mynode = new struct node; 
       mynode->x[0] = (cur->x[0]+next->x[0])/2.; 
       mynode->x[1] = (cur->x[1]+next->x[1])/2.; 
       mynode->stuck = false; 
       mynode->loccurve = -1; 
       mynode->disttoprev = -1; 
       nodelist.insert(next,*mynode); 
      } 
      next++; 
      cur++; 
      count++; 
     } 
    } 
                   51,0-1  40% 

    struct timeval vectclock; 
    double vecttime; 

    tick(&vectclock); 
    for (int i=0; i<niters; i++) { 
     int nelem = nodevect.size(); 
     double dx1, dy1, dx2, dy2; 
     dx1 = nodevect[0].x[0]-nodevect[1].x[0]; 
     dy1 = nodevect[0].x[1]-nodevect[1].x[1]; 

     for (int elem=1; elem<nelem-1; elem++) { 
      dx2 = nodevect[elem].x[0]-nodevect[elem+1].x[0]; 
      dy2 = nodevect[elem].x[1]-nodevect[elem+1].x[1]; 

      double slope1 = (dy1/(dx1+EPSILON)); 
      double slope2 = (dy2/(dx2+EPSILON)); 

      nodevect[elem].disttoprev = sqrt(dx1*dx1 + dx2*dx2); 

      nodevect[elem].loccurve = (slope1*slope2*(dy1+dy2) + 
           slope2*(nodevect[elem-1].x[0] + 
             nodevect[elem].x[0]) - 
           slope1*(nodevect[elem].x[0] + 
             nodevect[elem+1].x[0]))/
          (2.*(slope2-slope1) + EPSILON); 

     } 

     // Insert interpolated pt every neveryth pt 
     int count = 1; 
     vector<struct node>::iterator next, cur; 
     next = cur = nodevect.begin(); 
     next++; 
     while (next != nodevect.end()) { 
      if (count % nevery == 0) { 
       struct node *mynode = new struct node; 
       mynode->x[0] = (cur->x[0]+next->x[0])/2.; 
       mynode->x[1] = (cur->x[1]+next->x[1])/2.; 
       mynode->stuck = false; 
       mynode->loccurve = -1; 
       mynode->disttoprev = -1; 
       nodevect.insert(next,*mynode); 
      } 
      next++; 
      cur++; 
      count++; 
     } 
    } 
    vecttime = tock(&vectclock); 

    cout << "Time for list: " << listtime << endl; 
    cout << "Time for vect: " << vecttime << endl; 

    vector<struct node>::iterator v; 
    list <struct node>::iterator l; 

    if (doPrint) { 
     cout << "Vector: " << endl; 
     for (v=nodevect.begin(); v!=nodevect.end(); ++v) { 
      cout << "[ (" << v->x[0] << "," << v->x[1] << "), " << v->disttoprev << ", " << v->loccurve << "] " << endl; 
     } 

     cout << endl << "List: " << endl; 
     for (l=nodelist.begin(); l!=nodelist.end(); ++l) { 
      cout << "[ (" << l->x[0] << "," << l->x[1] << "), " << l->disttoprev << ", " << l->loccurve << "] " << endl; 
     } 

    } 

    cout << "List size is " << nodelist.size() << endl; 
} 
+0

感謝您的回答並輸入每個人。接受喬納森回答,因爲他爲我寫了一個測試。謝謝!認爲不適合矢量版本,因爲我可以很容易地進行並行處理。 – jonalm

1

假設創造新元素的情況發生相對較少,我想借此排序向量方法,爲你所列出的所有原因:

  • 沒有浪費時間以下指導/指數圍繞
  • 充分利用空間局部性的
  • 更容易parallelise

當然,對於這個工作,你必須確保該載體是總是排序,不是簡單的每第k個時間步。

+0

元素的創建經常發生。我需要每個時間步添加1-2%的新節點。然後,我仍然必須每隔一步對矢量進行排序。 – jonalm

+0

@jonalm:啊哈。你應該把這個事實添加到你原來的問題中。 –

0

這對於並行編程的學生來說看起來很不錯。

你似乎有一個數據結構,自然地導致分佈,一個鏈。您可以對(半)靜態分配給不同線程的子鏈做相當多的工作。您可能需要分別處理N-1個邊界情況,但是如果子鏈長度大於3,那麼這些情況會彼此分離。

當然,在每一步之間您都必須更新全局變量,但鏈長等變量是簡單的並行添加。只需計算每個子鏈的長度,然後再添加它們。如果您的子鏈長度爲100000/8,則單線程工作是在這些步驟之間添加這8個子鏈長度。

如果節點的增長非常不均勻,則可能需要每隔一段時間重新平衡子鏈長度。

相關問題