2012-03-13 98 views
1

進程間通信我有一個OpenMP的並行處理程序,它看起來像這樣:線程在OpenMP的

[...] 
#pragma omp parallel 
{ 
//initialize threads 

#pragma omp for 
for(...) 
    { 
    //Work is done here 

    } 

} 

現在我加入MPI支持。我需要的是一個處理通信的線程,在我的情況下,它始終調用GatherAll並填充/清空一個鏈接列表,用於從其他進程接收/發送數據。該線程應該發送/接收直到標誌被設置。所以現在在這個例子中沒有MPI的東西,我的問題是在OpenMP中實現這個例程。 如何實現這樣的線程?例如,我試圖引進一個單一的指令,在這裏:

[...] 
int kill=0 
#pragma omp parallel shared(kill) 
{ 
//initialize threads 
#pragma omp single nowait 
{ 
    while(!kill) 
    send_receive(); 
} 
#pragma omp for 
for(...) 
    { 
    //Work is done here 

    } 
kill=1 

} 

但在這種情況下,程序卡住,因爲for循環等待上面的while循環線程後的隱性障礙。

謝謝rugermini。

+0

所以你幾乎想要單曲和單曲同時執行,以及何時完成停止單個? – Tudor 2012-03-13 14:08:21

+0

是的,正好... – rugermini 2012-03-13 14:12:29

回答

0

你可以嘗試添加nowait子句您single構造:

編輯:應對第一個註釋

如果啓用了OpenMP的嵌套並行,你也許能達到你想要什麼通過兩個層次的並行性。在頂層,你有兩個並行的並行部分,一個用於MPI通信,另一個用於本地計算。最後一部分本身可以並行化,這給了你第二層的並行化。只有執行這個級別的線程纔會受到障礙的影響。

#include <iostream> 
#include <omp.h> 

int main() 
{ 
    int kill = 0; 
#pragma omp parallel sections 
    { 
#pragma omp section 
    { 
     while (kill == 0){ 
     /* manage MPI communications */ 
     } 
    } 

#pragma omp section 
    { 
#pragma omp parallel 
#pragma omp for 
     for (int i = 0; i < 10000 ; ++i) { 
     /* your workload */ 
     } 
     kill = 1; 
    } 
    } 
} 

但是,你必須知道你的代碼將打破,如果你不至少有兩個線程,這意味着你打破假設代碼的順序和並行版本應該做的一樣的東西。

將OpenMP內核封裝到更全局的MPI通信方案(可能使用異步通信將通信與計算重疊)中會更清晰。

+0

對不起,忘了寫它。我在代碼中有這個條款。 – rugermini 2012-03-13 14:17:27

+0

謝謝大家,這似乎工作! – rugermini 2012-03-13 15:17:03

0

嗯。如果你確實在你的程序中加入了MPI「支持」,那麼你應該使用mpi_allgather,因爲mpi_gatherall不存在。請注意,mpi_allgather是一個集體操作,也就是所有通信器中的進程調用它。當其他進程執行任何操作時,不能有進程收集數據。你可以做的是使用MPI 單面通信來實現你的想法;這將有點棘手,但不過是如果一個進程只讀取其他進程的內存。

我很困惑你使用術語「線程」和MPI。我擔心你對OpenMP和MPI的混淆,其中一個變種叫做OpenMPI。儘管這個名字與OpenMP不同,但與奶酪的粉筆不同。 MPI程序是根據進程編寫的,而不是線程。典型的OpenMP實現確實使用線程,但細節通常對程序員來說是很好的隱藏的。

對於您正在嘗試或似乎在嘗試在您的OpenMP代碼中使用MPI'內部',我印象非常深刻。這與我所做的工作完全相反,並且看到其他人在一些嚴重的大型計算機上進行。這種「混合」並行化的標準模式是編寫調用OpenMP代碼的MPI程序。當今許多非常大的計算機都包含實際上多核盒子的集合。一種典型的編程方法是在每個盒子上運行一個MPI進程,並且每個進程爲盒子中的每個核心使用一個OpenMP線程。

+0

這正是我想要做的:每個節點一個MPI進程,每個進程有許多OpenMP線程。你是對的,這是一個HPC代碼。對不起,寫了allgather而不是gatherall。 – rugermini 2012-03-13 14:33:23

+0

我想我看到了問題;好的,那是令人困惑的。在我上面的代碼中沒有mpi相關的東西。我只是試圖提出我提出這個問題的動機。 – rugermini 2012-03-13 14:38:07

0

你必須小心,因爲你不能讓你的MPI調用線程「跳過」omp for循環;線程團隊中的所有線程都必須經過for循環。

有幾個方法可以做到這一點:嵌套並行性和任務,你可以啓動一個任務做消息傳遞和花葯調用它有一個OMP平行於它的日常工作:

#include <mpi.h> 
#include <omp.h> 
#include <stdio.h> 

void work(int rank) { 
    const int n=14; 
    #pragma omp parallel for 
    for (int i=0; i<n; i++) { 
     int tid = omp_get_thread_num(); 
     printf("%d:%d working on item %d\n", rank, tid, i); 
    } 
} 

void sendrecv(int rank, int sneighbour, int rneighbour, int *data) { 
    const int tag=1; 
    MPI_Sendrecv(&rank, 1, MPI_INT, sneighbour, tag, 
        data, 1, MPI_INT, rneighbour, tag, 
        MPI_COMM_WORLD, MPI_STATUS_IGNORE); 
} 

int main(int argc, char **argv) { 
    int rank, size; 
    int sneighbour; 
    int rneighbour; 
    int data; 
    int got; 

    MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &got); 
    MPI_Comm_size(MPI_COMM_WORLD,&size); 
    MPI_Comm_rank(MPI_COMM_WORLD,&rank); 

    omp_set_nested(1); 
    sneighbour = rank + 1; 
    if (sneighbour >= size) sneighbour = 0; 
    rneighbour = rank - 1; 
    if (rneighbour <0) rneighbour = size-1; 

    #pragma omp parallel 
    { 
     #pragma omp single 
     { 
      #pragma omp task 
      { 
       sendrecv(rank, sneighbour, rneighbour, &data); 
       printf("Got data from %d\n", data); 
      } 

      #pragma omp task 
      work(rank); 
     } 
    } 


    MPI_Finalize(); 
    return 0; 
} 

或者,你可以讓你的for循環schedule(dynamic) OMP使其他線程可以同時主線程發送挑選了一些鬆弛的,並且當它這樣做的主線程可以拿起一些工作:

#include <mpi.h> 
#include <omp.h> 
#include <stdio.h> 

void sendrecv(int rank, int sneighbour, int rneighbour, int *data) { 
    const int tag=1; 
    MPI_Sendrecv(&rank, 1, MPI_INT, sneighbour, tag, 
        data, 1, MPI_INT, rneighbour, tag, 
        MPI_COMM_WORLD, MPI_STATUS_IGNORE); 
} 

int main(int argc, char **argv) { 
    int rank, size; 
    int sneighbour; 
    int rneighbour; 
    int data; 
    int got; 
    const int n=14; 

    MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &got); 
    MPI_Comm_size(MPI_COMM_WORLD,&size); 
    MPI_Comm_rank(MPI_COMM_WORLD,&rank); 

    omp_set_nested(1); 
    sneighbour = rank + 1; 
    if (sneighbour >= size) sneighbour = 0; 
    rneighbour = rank - 1; 
    if (rneighbour <0) rneighbour = size-1; 

    #pragma omp parallel 
    { 
      #pragma omp master 
      { 
       sendrecv(rank, sneighbour, rneighbour, &data); 
       printf("Got data from %d\n", data); 
      } 

      #pragma omp for schedule(dynamic) 
      for (int i=0; i<n; i++) { 
       int tid = omp_get_thread_num(); 
       printf("%d:%d working on item %d\n", rank, tid, i); 
      } 
    } 


    MPI_Finalize(); 
    return 0; 
}