2010-12-14 63 views
1

我使用Open MPI庫實現以下算法:我們有兩個進程p1p2。他們都在執行一些迭代,並且在每次迭代結束時,他們都會傳達他們的結果。問題是執行不一定是平衡的,所以p1可能會在p2執行時執行10次迭代。儘管如此,我希望p2可以讀取p1執行的最後一次迭代的最新結果。MPI:取消非阻塞發送

因此,我的想法是,p1在每次迭代中發送其結果。但是,在從迭代i發送結果之前,應該檢查p2是否實際讀取來自迭代i-1的信息。如果不是,則應取消先前的發送,以便當p2p1讀取時,它將讀取最近的結果。

不幸的是,我不知道該怎麼做。我已經嘗試使用MPI_CANCEL,如下面的代碼:

int main (int argc, char *argv[]){ 

    int myrank, numprocs; 
    MPI_Status status; 
    MPI_Request request; 

    MPI_Init(&argc, &argv); 
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs); 
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank); 

    if(myrank == 0){ 
     int send_buf = 1, flag; 
     MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, 
        &request); 
     MPI_Cancel(&request); 
     MPI_Wait(&request, &status); 
     MPI_Test_cancelled(&status, &flag); 
     if (flag) printf("Send cancelled\n"); 
     else printf("Send NOT cancelled\n"); 
     send_buf = 2; 
     MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, 
        &request); 
    } 
    else { 
     sleep(5); 
     int msg; 
     MPI_Recv(&msg, 1, MPI_INT, 0, 123, 
       MPI_COMM_WORLD, &status); 
     printf("%d\n", msg); 
    } 
    MPI_Finalize(); 

    return 0; 
} 

但是,當我執行,它說,發送不能被取消,p2打印1,而不是2

我想了解是否有任何方法可以實現我提出的建議,或者是否有替代方案來編寫p1p2之間的行爲。

+0

取消發送是邪惡的,你不應該使用它。並且不能保證你實際上可以取消發送。如果您認爲您可以通過取消來防止遠程編寫,您會感到失望。 – Jeff 2015-10-15 21:08:16

回答

5

我會扭轉對通信的控制。代替p1發送它必須取消的不必要的消息,p2應該表示它已準備好接收消息,並且p1只會發送。同時,p1只是用最新結果覆蓋其發送緩衝區。

在(未經)代碼:

if (rank == 0) 
{ 
    int ready; 
    MPI_Request p2_request; 
    MPI_Status p2_status; 
    // initial request 
    MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request); 
    for (int i=0; true; i++) 
    { 
     sleep(1); 
     MPI_Test(&p2_request, &ready, &p2_status); 
     if (ready) 
     { 
      // blocking send: p2 is ready to receive 
      MPI_Send(&i, 1, MPI_INT, 1, 123, MPI_COMM_WORLD); 
      // post new request 
      MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request); 
     } 
    } 
} 
else 
{ 
    int msg; 
    MPI_Status status; 
    while (true) 
    { 
     sleep(5); 
     // actual message content doesn't matter, just let p1 know we're ready 
     MPI_Send(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD); 
     // receive message 
     MPI_Recv(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD, &status); 
    } 
} 

現在,就像我說的,這是未經測試的代碼,但你可能可以看到我在那裏得到。 MPI_Cancel只能在事情發生嚴重錯誤時使用:在正常執行過程中不應取消任何消息。

5

另一種方法完全是使用MPI單向通信(例如,http://www.linux-mag.com/id/1793)。但是請注意,進行被動通信,這是你真正想要的,這是相當棘手的(儘管配對,mpi_win_post和mpi_win_start更容易),並且單方面的東西有望在MPI-3中全部改變,所以我不'我知道我會建議你走多遠。

更直接地與你在這裏第一次嘗試的內容相關,而不是取消消息(如上面所建議的那樣非常激烈),它可能更簡單,只是通過所有排隊的消息(MPI保證消息不會你追我趕 - 唯一需要注意的是,如果你使用的是MPI_THREAD_MULTIPLE並具有多線程一個MPI任務內發送在這種情況下,順序porly定義):

#include <stdio.h> 
#include <mpi.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <math.h> 

void compute() { 
    const int maxusecs=500; 
    unsigned long sleepytime=(unsigned long)round(((float)rand()/RAND_MAX)*maxusecs); 

    usleep(sleepytime); 
} 

int main(int argc, char** argv) 
{ 
    int rank, size, i; 
    int otherrank; 
    const int niters=10; 
    const int tag=5; 
    double newval; 
    double sentvals[niters+1]; 
    double othernewval; 
    MPI_Request reqs[niters+1]; 
    MPI_Status stat; 
    int ready; 

    MPI_Init(&argc, &argv); 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    MPI_Comm_size(MPI_COMM_WORLD, &size); 
    if (size != 2) { 
    fprintf(stderr,"This assumes 2 processes\n"); 
    MPI_Finalize(); 
    exit(-1); 
    } 

    otherrank = (rank == 0 ? 1 : 0); 
    srand(rank); 

    compute(); 
    newval = rank * 100. + 0; 
    sentvals[0] = newval; 
    MPI_Isend(&(sentvals[0]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0])); 
    MPI_Recv (&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat); 
    for (i=0; i<niters; i++) { 

     MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat); 
     while (ready) { 
      MPI_Recv(&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat); 
      printf("%s[%d]: Reading queued data %lf:\n", 
        (rank == 0 ? "" : "\t\t\t\t"), rank, othernewval); 
      MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat); 
     } 

     printf("%s[%d]: Got data %lf, computing:\n", 
       (rank == 0 ? "" : "\t\t\t\t"), rank, othernewval); 
     compute(); 

     /* update my data */ 
     newval = rank * 100. + i + 1; 
     printf("%s[%d]: computed %lf, sending:\n", 
       (rank == 0 ? "" : "\t\t\t\t"), rank, newval); 
     sentvals[i+1] = newval; 
     MPI_Isend(&(sentvals[i+1]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0])); 
    } 


    MPI_Finalize(); 

    return 0; 
} 

運行這個給你(注意,只是因爲數據被髮送

[0]: Got data 100.000000, computing: 
           [1]: Got data 0.000000, computing: 
[0]: computed 1.000000, sending: 
[0]: Got data 100.000000, computing: 
           [1]: computed 101.000000, sending: 
           [1]: Got data 0.000000, computing: 
[0]: computed 2.000000, sending: 
[0]: Got data 100.000000, computing: 
           [1]: computed 102.000000, sending: 
           [1]: Reading queued data 1.000000: 
           [1]: Got data 1.000000, computing: 
[0]: computed 3.000000, sending: 
[0]: Reading queued data 101.000000: 
[0]: Got data 101.000000, computing: 
           [1]: computed 103.000000, sending: 
           [1]: Reading queued data 2.000000: 
           [1]: Got data 2.000000, computing: 
[0]: computed 4.000000, sending: 
           [1]: computed 104.000000, sending: 
[0]: Reading queued data 102.000000: 
           [1]: Reading queued data 3.000000: 
           [1]: Got data 3.000000, computing: 
[0]: Got data 102.000000, computing: 
[0]: computed 5.000000, sending: 
[0]: Reading queued data 103.000000: 
[0]: Got data 103.000000, computing: 
           [1]: computed 105.000000, sending: 
           [1]: Reading queued data 4.000000: 
           [1]: Got data 4.000000, computing: 
[0]: computed 6.000000, sending: 
[0]: Reading queued data 104.000000: 
[0]: Got data 104.000000, computing: 
           [1]: computed 106.000000, sending: 
           [1]: Reading queued data 5.000000: 
           [1]: Got data 5.000000, computing: 
[0]: computed 7.000000, sending: 
[0]: Reading queued data 105.000000: 
[0]: Got data 105.000000, computing: 
           [1]: computed 107.000000, sending: 
           [1]: Reading queued data 6.000000: 
           [1]: Got data 6.000000, computing: 
[0]: computed 8.000000, sending: 
[0]: Reading queued data 106.000000: 
[0]: Got data 106.000000, computing: 
           [1]: computed 108.000000, sending: 
           [1]: Reading queued data 7.000000: 
           [1]: Got data 7.000000, computing: 
[0]: computed 9.000000, sending: 
[0]: Reading queued data 107.000000: 
[0]: Got data 107.000000, computing: 
           [1]: computed 109.000000, sending: 
           [1]: Reading queued data 8.000000: 
           [1]: Got data 8.000000, computing: 
[0]: computed 10.000000, sending: 
           [1]: computed 110.000000, sending: 
:並不意味着它在印刷時)接受

請注意,這只是演示代碼,最終版本真的需要在年底有做waitalls多iprobes釋放任何掛起的請求和刷新任何等待消息。

0

您的環境和MPI分發是否支持多線程?如果是這樣,你可以在P1中創建一個線程來計算值並將每個迭代的結果存儲在一個與P1主線程共享的變量中(通過信號量寫保護) 正如上面的suszterpatt所建議的,然後讓P2發送一個「我準備好了「消息給P1並讓P1以最近一次迭代的值作爲響應。