2015-09-06 95 views
4

我想了解使用MPI並行處理阻塞和非阻塞消息傳遞機制之間的區別。假設我們有以下阻止代碼:如何修改MPI阻塞發送和接收到非阻塞

#include <stdio.h> 
#include <string.h> 
#include "mpi.h" 

int main (int argc, char* argv[]) { 
    const int maximum_message_length = 100; 
    const int rank_0= 0; 
    char message[maximum_message_length+1]; 
    MPI_Status status; /* Info about receive status */ 
    int my_rank; /* This process ID */ 
    int num_procs; /* Number of processes in run */ 
    int source; /* Process ID to receive from */ 
    int destination; /* Process ID to send to */ 
    int tag = 0; /* Message ID */ 

    MPI_Init(&argc, &argv); 
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); 
    MPI_Comm_size(MPI_COMM_WORLD, &num_procs); 

    /* clients processes */ 
    if (my_rank != server_rank) { 
     sprintf(message, "Hello world from process# %d", my_rank); 
     MPI_Send(message, strlen(message) + 1, MPI_CHAR, rank_0, tag, MPI_COMM_WORLD); 
    } else {  
    /* rank 0 process */ 
     for (source = 0; source < num_procs; source++) { 
      if (source != rank_0) { 
       MPI_Recv(message, maximum_message_length + 1, MPI_CHAR, source, tag, 
       MPI_COMM_WORLD,&status); 
       fprintf(stderr, "%s\n", message); 
      } 
     } 
    } 
     MPI_Finalize(); 
} 

每個處理器執行其任務,併發送回_6(接收器)。 rank_0將運行從1到n-1個進程的循環並按順序打印它們(如果當前客戶端尚未發送任務,我在循環中的步驟可能無法繼續)。如何修改此代碼以實現使用MPI_IsendMPI_Irecv的非阻塞機制?我是否需要刪除接收器部分(_6)環路和明確說明MPI_Irecv(..)爲每一個客戶,即

MPI_Irecv(message, maximum_message_length + 1, MPI_CHAR, source, tag, 
        MPI_COMM_WORLD,&status); 

謝謝。

+0

你還是希望有在接收過程中,當印刷一些秩序消息? – haraldkl

+0

@haraldkl在OP的代碼中沒有任何書面保證書,因爲無論如何都將以任何順序接收消息,所以我不認爲。 – NoseKnowsAll

+0

不,訂單沒關係 –

回答

4

你使用非阻塞通信做什麼是發佈通信,然後立即繼續與您的程序做其他的事情,這可能會發布更多的通信。尤其是,您可以立即發佈所有收到的郵件,然後等待郵件完成。 這是您通常會在您的場景中執行的操作。

但請注意,這個特定的設置是一個不好的例子,因爲它基本上只是重新實現MPI_Gather

以下是您通常會如何處理設置中的非阻塞通信。首先,您需要一些存儲空間來存放所有消息,並且還需要一系列請求句柄來跟蹤非阻塞通信請求,因此您的代碼的第一部分需要相應更改:

#include <stdlib.h> 
#include <stdio.h> 
#include <string.h> 
#include "mpi.h" 

int main (int argc, char* argv[]) { 
    const int maximum_message_length = 100; 
    const int server_rank = 0; 
    char message[maximum_message_length+1]; 
    char *allmessages; 
    MPI_Status *status; /* Info about receive status */ 
    MPI_Request *req; /* Non-Blocking Requests */ 
    int my_rank; /* This process ID */ 
    int num_procs; /* Number of processes in run */ 
    int source; /* Process ID to receive from */ 
    int tag = 0; /* Message ID */ 

    MPI_Init(&argc, &argv); 
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); 
    MPI_Comm_size(MPI_COMM_WORLD, &num_procs); 

    /* clients processes */ 
    if (my_rank != server_rank) { 
     sprintf(message, "Hello world from process# %d", my_rank); 
     MPI_Send(message, maximum_message_length + 1, MPI_CHAR, server_rank, 
       tag, MPI_COMM_WORLD); 
    } else { 

這裏不需要無阻塞發送。現在我們繼續並在server_rank上接收所有這些消息。我們需要遍歷所有這些,存儲每個人的請求句柄:

/* rank 0 process */ 
     allmessages = malloc((maximum_message_length+1)*num_procs); 
     status = malloc(sizeof(MPI_Status)*num_procs); 
     req = malloc(sizeof(MPI_Request)*num_procs); 

     for (source = 0; source < num_procs; source++) { 
      req[source] = MPI_REQUEST_NULL; 
      if (source != server_rank) { 
       /* Post non-blocking receive for source */ 
       MPI_Irecv(allmessages+(source*(maximum_message_length+1)), 
          maximum_message_length + 1, MPI_CHAR, source, tag, 
          MPI_COMM_WORLD, req+source); 
       /* Proceed without waiting on the receive */ 
       /* (posting further receives */ 
      } 
     } 
     /* Wait on all communications to complete */ 
     MPI_Waitall(num_procs, req, status); 
     /* Print the messages in order to the screen */ 
     for (source = 0; source < num_procs; source++) { 
      if (source != server_rank) { 
       fprintf(stderr, "%s\n", 
         allmessages+(source*(maximum_message_length+1))); 
      } 
     } 
    } 
    MPI_Finalize(); 
} 

,我們需要等待它們全部完成,打印在消息發佈非阻塞接收後正確的順序。爲此,我們使用了一個MPI_Waitall,這允許我們阻塞,直到滿足所有請求句柄。請注意,爲簡單起見,我在此包含server_rank,但最初將其請求設置爲MPI_REQUEST_NULL,因此它將被忽略。 如果您不關心訂單,可以通過循環處理請求並使用MPI_Waitany來儘快處理通信。只要任何溝通完成,您就可以根據相應的數據採取行動。

隨着MPI_Gather的代碼應該是這樣的:

#include <stdlib.h> 
#include <stdio.h> 
#include <string.h> 
#include "mpi.h" 

int main (int argc, char* argv[]) { 
    const int maximum_message_length = 100; 
    const int server_rank = 0; 
    char message[maximum_message_length+1]; 
    char *allmessages; 
    int my_rank; /* This process ID */ 
    int num_procs; /* Number of processes in run */ 
    int source; /* Process ID to receive from */ 
    int tag = 0; /* Message ID */ 

    MPI_Init(&argc, &argv); 
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); 
    MPI_Comm_size(MPI_COMM_WORLD, &num_procs); 

    if (my_rank == server_rank) { 
     allmessages = malloc((maximum_message_length+1)*num_procs); 
    } 
    sprintf(message, "Hello world from process# %d", my_rank); 
    MPI_Gather(message, (maximum_message_length+1), MPI_CHAR, 
       allmessages, (maximum_message_length+1), MPI_CHAR, 
       server_rank, MPI_COMM_WORLD); 

    if (my_rank == server_rank) { 
     /* Print the messages in order to the screen */ 
     for (source = 0; source < num_procs; source++) { 
      if (source != server_rank) { 
       fprintf(stderr, "%s\n", 
         allmessages+(source*(maximum_message_length+1))); 
      } 
     } 
    } 
    MPI_Finalize(); 
} 

並與MPI-3你甚至可以使用非阻塞MPI_Igather

如果你不關心排序,最後一部分(從MPI_Waitall)可以用MPI_Waitany就像這樣:

for (i = 0; i < num_procs-1; i++) { 
     /* Wait on any next communication to complete */ 
     MPI_Waitany(num_procs, req, &source, status); 
     fprintf(stderr, "%s\n", 
       allmessages+(source*(maximum_message_length+1))); 
    } 
+0

非常感謝。這很好解釋。 –

+0

@MikeH。很高興聽到這個消息。 – haraldkl