2012-12-24 39 views
1

編輯:對不起,我忘了提及,我乘以兩個5000x5000矩陣。爲什麼增加進程數量不會減少並行代碼中的執行時間?

這是指示當我增加進程數時,時間也在增加的輸出。所以這個代碼的邏輯有問題嗎?我從網上找到它,只把它改名爲matrixMulti和它的printf。當我連接到網格實驗室並增加進程數時,在我看來邏輯但不能正常工作。所以你怎麼看?

terminal screen shot

/********************************************************************************************** 
* Matrix Multiplication Program using MPI. 
* 
* Viraj Brian Wijesuriya - University of Colombo School of Computing, Sri Lanka. 
* 
* Works with any type of two matrixes [A], [B] which could be multiplied to produce a matrix [c]. 
* 
* Master process initializes the multiplication operands, distributes the muliplication 
* operation to worker processes and reduces the worker results to construct the final output. 
* 
************************************************************************************************/ 

#include<stdio.h> 
#include<mpi.h> 
#define NUM_ROWS_A 5000 //rows of input [A] 
#define NUM_COLUMNS_A 5000 //columns of input [A] 
#define NUM_ROWS_B 5000 //rows of input [B] 
#define NUM_COLUMNS_B 5000 //columns of input [B] 
#define MASTER_TO_SLAVE_TAG 1 //tag for messages sent from master to slaves 
#define SLAVE_TO_MASTER_TAG 4 //tag for messages sent from slaves to master 
void makeAB(); //makes the [A] and [B] matrixes 
void printArray(); //print the content of output matrix [C]; 

int rank; //process rank 
int size; //number of processes 
int i, j, k; //helper variables 
double mat_a[NUM_ROWS_A][NUM_COLUMNS_A]; //declare input [A] 
double mat_b[NUM_ROWS_B][NUM_COLUMNS_B]; //declare input [B] 
double mat_result[NUM_ROWS_A][NUM_COLUMNS_B]; //declare output [C] 
double start_time; //hold start time 
double end_time; // hold end time 
int low_bound; //low bound of the number of rows of [A] allocated to a slave 
int upper_bound; //upper bound of the number of rows of [A] allocated to a slave 
int portion; //portion of the number of rows of [A] allocated to a slave 
MPI_Status status; // store status of a MPI_Recv 
MPI_Request request; //capture request of a MPI_Isend 

int main(int argc, char *argv[]) 
{ 

    MPI_Init(&argc, &argv); //initialize MPI operations 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank); //get the rank 
    MPI_Comm_size(MPI_COMM_WORLD, &size); //get number of processes 

    /* master initializes work*/ 
    if (rank == 0) { 
     makeAB(); 
     start_time = MPI_Wtime(); 
     for (i = 1; i < size; i++) {//for each slave other than the master 
      portion = (NUM_ROWS_A/(size - 1)); // calculate portion without master 
      low_bound = (i - 1) * portion; 
      if (((i + 1) == size) && ((NUM_ROWS_A % (size - 1)) != 0)) {//if rows of [A] cannot be equally divided among slaves 
       upper_bound = NUM_ROWS_A; //last slave gets all the remaining rows 
      } else { 
       upper_bound = low_bound + portion; //rows of [A] are equally divisable among slaves 
      } 
      //send the low bound first without blocking, to the intended slave 
      MPI_Isend(&low_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &request); 
      //next send the upper bound without blocking, to the intended slave 
      MPI_Isend(&upper_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &request); 
      //finally send the allocated row portion of [A] without blocking, to the intended slave 
      MPI_Isend(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &request); 
     } 
    } 
    //broadcast [B] to all the slaves 
    MPI_Bcast(&mat_b, NUM_ROWS_B*NUM_COLUMNS_B, MPI_DOUBLE, 0, MPI_COMM_WORLD); 

    /* work done by slaves*/ 
    if (rank > 0) { 
     //receive low bound from the master 
     MPI_Recv(&low_bound, 1, MPI_INT, 0, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &status); 
     //next receive upper bound from the master 
     MPI_Recv(&upper_bound, 1, MPI_INT, 0, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &status); 
     //finally receive row portion of [A] to be processed from the master 
     MPI_Recv(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, 0, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &status); 
     for (i = low_bound; i < upper_bound; i++) {//iterate through a given set of rows of [A] 
      for (j = 0; j < NUM_COLUMNS_B; j++) {//iterate through columns of [B] 
       for (k = 0; k < NUM_ROWS_B; k++) {//iterate through rows of [B] 
        mat_result[i][j] += (mat_a[i][k] * mat_b[k][j]); 
       } 
      } 
     } 
     //send back the low bound first without blocking, to the master 
     MPI_Isend(&low_bound, 1, MPI_INT, 0, SLAVE_TO_MASTER_TAG, MPI_COMM_WORLD, &request); 
     //send the upper bound next without blocking, to the master 
     MPI_Isend(&upper_bound, 1, MPI_INT, 0, SLAVE_TO_MASTER_TAG + 1, MPI_COMM_WORLD, &request); 
     //finally send the processed portion of data without blocking, to the master 
     MPI_Isend(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, 0, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &request); 
    } 

    /* master gathers processed work*/ 
    if (rank == 0) { 
     for (i = 1; i < size; i++) {// untill all slaves have handed back the processed data 
      //receive low bound from a slave 
      MPI_Recv(&low_bound, 1, MPI_INT, i, SLAVE_TO_MASTER_TAG, MPI_COMM_WORLD, &status); 
      //receive upper bound from a slave 
      MPI_Recv(&upper_bound, 1, MPI_INT, i, SLAVE_TO_MASTER_TAG + 1, MPI_COMM_WORLD, &status); 
      //receive processed data from a slave 
      MPI_Recv(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, i, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &status); 
     } 
     end_time = MPI_Wtime(); 
     printf("\nRunning Time = %f\n\n", end_time - start_time); 
     printArray(); 
    } 
    MPI_Finalize(); //finalize MPI operations 
    return 0; 
} 

void makeAB() 
{ 
    for (i = 0; i < NUM_ROWS_A; i++) { 
     for (j = 0; j < NUM_COLUMNS_A; j++) { 
      mat_a[i][j] = i + j; 
     } 
    } 
    for (i = 0; i < NUM_ROWS_B; i++) { 
     for (j = 0; j < NUM_COLUMNS_B; j++) { 
      mat_b[i][j] = i*j; 
     } 
    } 
} 

void printArray() 
{ 
    for (i = 0; i < NUM_ROWS_A; i++) { 
     printf("\n"); 
     for (j = 0; j < NUM_COLUMNS_A; j++) 
      printf("%8.2f ", mat_a[i][j]); 
    } 
    printf("\n\n\n"); 
    for (i = 0; i < NUM_ROWS_B; i++) { 
     printf("\n"); 
     for (j = 0; j < NUM_COLUMNS_B; j++) 
      printf("%8.2f ", mat_b[i][j]); 
    } 
    printf("\n\n\n"); 
    for (i = 0; i < NUM_ROWS_A; i++) { 
     printf("\n"); 
     for (j = 0; j < NUM_COLUMNS_B; j++) 
      printf("%8.2f ", mat_result[i][j]); 
    } 
    printf("\n\n"); 
} 
+0

由於MPI通信涉及過短的消息。嘗試一次發送64或1024個整數 –

+0

您是否在談論low_bound和upper_bound發送? –

+0

哦btw我很遺憾,我忘記提及我乘以兩個5000x5000矩陣。 –

回答

4

發佈的代碼存在一些真實的正確性問題。讓我們來看看等級0的發送循環:

for (i = 1; i < size; i++) { 
     //... 
     low_bound = (i - 1) * portion; 
     upper_bound = low_bound + portion; 

     MPI_Isend(&low_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &request); 
     MPI_Isend(&upper_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &request); 
     MPI_Isend(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &request); 
    } 

你不能那樣做。如果要使用非阻塞請求,則最終必須爲請求MPI_Wait()MPI_Test(),以便您(和MPI庫)知道它們已完成。您需要對avoid leaking resources執行此操作,但在這種情況下更重要的是,在您甚至知道發送已發生之前,您會重複覆蓋low_boundupper_bound。誰知道你的工作任務正在獲得什麼數據。另外,每當你絕對保證資源泄漏時,通過覆蓋請求。

有幾種方法可以解決這個問題;最簡單的就是創建上限和下限的簡單數組,並請求數組:

if (rank == 0) { 
    makeAB(); 
    requests  = malloc(size*3*sizeof(MPI_Request)); 
    low_bounds = malloc(size*sizeof(int)); 
    upper_bounds = malloc(size*sizeof(int)); 

    start_time = MPI_Wtime(); 
    for (i = 1; i < size; i++) { 
     portion = (NUM_ROWS_A/(size - 1)); 
     low_bounds[i] = (i - 1) * portion; 
     if (((i + 1) == size) && ((NUM_ROWS_A % (size - 1)) != 0)) { 
      upper_bounds[i] = NUM_ROWS_A; 
     } else { 
      upper_bounds[i] = low_bounds[i] + portion; 
     } 

     MPI_Isend(&(low_bounds[i]), 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &(requests[3*i])); 
     MPI_Isend(&(upper_bounds[i]), 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &(requests[3*i+1])); 
     MPI_Isend(&mat_a[low_bounds[i]][0], (upper_bounds[i] - low_bounds[i]) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &(requests[3*i+2])); 
    } 
    MPI_Waitall(3*(size-1), &(requests[3]), MPI_STATUS_IGNORE); 
    free(requests); 

關於這樣做的好處是,由於等級0保存此信息,工人不需要發送回來時,他們正在做的,等級0只需直接接收到正確的地方:

//... 
    for (i = low_bound; i < upper_bound; i++) { 
     for (j = 0; j < NUM_COLUMNS_B; j++) { 
      for (k = 0; k < NUM_ROWS_B; k++) { 
       mat_result[i][j] += (mat_a[i][k] * mat_b[k][j]); 
      } 
     } 
    } 
    MPI_Send(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, 0, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD); 

//... 
if (rank == 0) { 
    for (i = 1; i < size; i++) { 
     MPI_Recv(&mat_result[low_bounds[i]][0], (upper_bounds[i] - low_bounds[i]) * NUM_COLUMNS_B, MPI_DOUBLE, i, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &status); 
    } 

但只要你有這些價值具有分發到所有處理器陣列,您可以使用MPI_Scatter操作將比通過循環發送更高效:

for (i = 1; i < size; i++) { 
     low_bounds[i] = ... 
     upper_bounds[i] = ... 
    } 

    MPI_Scatter(low_bounds, 1, MPI_INT, &low_bound, 1, MPI_INT, 0, MPI_COMM_WORLD); 
    MPI_Scatter(upper_bounds, 1, MPI_INT, &upper_bound, 1, MPI_INT, 0, MPI_COMM_WORLD); 

理想情況下,您也可以使用散點圖或其變體來分佈A數組。

MPI_Scatter()是一個集體操作,像MPI_Bcast(),這將帶給我們你的下一個問題。在你的原始代碼中,你有:

//rank 0: 
    for (i = 1; i < size; i++) { 
     //... 
     MPI_Isend(); 
     MPI_Isend(); 
     MPI_Isend(); 
    } 

    MPI_Bcast(); 

    // other ranks: 
    MPI_Bcast(); 

    MPI_Recv(); 
    MPI_Recv(); 
    MPI_Recv(); 

集體和點對點通信的交錯可能是非常危險的,並且可能導致死鎖。這裏沒有必要;你應該把Bcast移到Scatter和Recv()之後(現在只有1個recv)。這使得你的工人任務的代碼看起來像:

MPI_Scatter(NULL, 1, MPI_INT, &low_bound, 1, MPI_INT, 0, MPI_COMM_WORLD); 
    MPI_Scatter(NULL, 1, MPI_INT, &upper_bound, 1, MPI_INT, 0, MPI_COMM_WORLD); 

    MPI_Recv(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, 0, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &status); 
    MPI_Bcast(&mat_b, NUM_ROWS_B*NUM_COLUMNS_B, MPI_DOUBLE, 0, MPI_COMM_WORLD); 

使擺脫大部分的正確性問題,但我還是建議使用分散分佈在A數組,然後使用級別0來完成其計算的「公平份額」,同時等待工作人員的任務。 (這有一個好處,就是你的程序在size = 1時工作)。所以現在讓我們看看性能問題。

對於固定問題的大小,程序具有:

  • 分配矩陣的上界和下界(2個短消息,或2個集體)
  • 分發A矩陣((大小-1 )長的消息,大小N^2 /(大小-1)加倍)
  • 廣播B矩陣(發送N^2加倍到所有任務使用一個集體)
  • 檢索A矩陣(與發送A矩陣)

並且每個任務必須

  • 計算矩陣乘積(N^3 /(大小-1)的操作)。

很容易看出,每個等級所要完成的實際計算工作量實際上隨處理器運行的數量降低爲1 /(P-1),但通信量的工作量上升(如P或lg P,具體取決於)。在某些時候,那些在更多處理器上交叉和運行的東西只會減慢速度。那麼這點在哪裏?

單一8核Nehalem節點上做一個快速縮放測試,並使用IPM得到的時間被消耗,其中一個簡單的數,我們有:

worker | running |   | MPI 
tasks | time | Speedup | time 
--------+-----------+----------+-------- 
    1 | 90.85s |  - | 45.5s 
    2 | 45.75s | 1.99x | 15.4s 
    4 | 23.42s | 3.88x | 4.93s 
    6 | 15.75s | 5.76x | 2.51s 

其實,這是不是太糟糕; MPI時間實際上幾乎全部用於MPI_Recv(),其中節點表示複製矩陣片段的成本,並且對於等級0過程,等待結果開始從工作任務返回。這表明,排名爲0的人做了一些工作,並且用收集操作替換線性循環接收將是有用的優化。

自然地,當您離開節點或處理器數量更多時,通信成本將繼續增加,並且縮放會惡化。

更多次要點:

首先,主從通常是解決緊密耦合的數值問題用簡單的負載均衡像矩陣乘法一個相當差的方式。但我會認爲這只是一個學習MPI練習而已。注意,當然,要做到一個基於MPI的矩陣乘法的正確方法是使用現有的矩陣乘法庫,例如SCALAPACKEigen

其次,大量使用全局變量是一般非常無益的,但超出了這個問題的範圍。我也注意到,NUM_COLUMNS_A必然是NUM_ROWS_B,你不需要兩者。

1

在分離過程中,你需要平衡採取跨VS儲蓄獲得發送結果的時間。就你而言,我猜想你發送的計算比發送本地計算需要更長的時間。

嘗試與其他進程共享更大的工作塊。

+0

哦btw我很抱歉,我忘了提及我乘以兩個5000x5000矩陣 –

5

這實際上並不令人驚訝。您擁有的工作人員越多,您擁有的通信開銷就越多(分工,彙總結果),所以通常情況下,您有足夠的工作人員可以利用並行工作,而不是那麼多的工人通信開銷開始成爲問題。隨着核心數量的增加,工作量變小,通信開銷增加,收益遞減。這就是爲什麼在編寫並行應用程序時,需要進行大量工作來衡量哪些工作人員獲得了最佳性能,並設計了可將開銷最小化的網絡結構。

相關問題