2016-03-18 46 views
4

如何修改以下代碼(改編自http://materials.jeremybejarano.com/MPIwithPython/pointToPoint.html),以便每個comm.Send實例都被root = 0收到並打印輸出。此時,只收到第一個發送命令。使用mpi4py接收多個發送命令

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
from mpi4py.MPI import ANY_SOURCE 
import numpy as np 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 


else: 
    for i in range(0,np.random.randint(1,10),1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0) 


if rank == 0: 
    comm.Recv(randNum, ANY_SOURCE) 
    print "Process", rank, "received the number", randNum[0] 
+0

只是爲了澄清/確認。我是否正確地假設你沒有機會事先知道每個級別將發送多少消息,即使是在等級本身上? – Zulan

+0

理想情況下,我想從每個等級發送消息,而不必知道等級將發送多少消息。如果這是不可能的,則可以在等級內計算將發送多少消息,但這對每個等級都是不同的。 – 218

回答

3

如果您不知道要發送多少條消息,則必須引入標記消息結束的消息。您可以通過使用特殊標籤來使用此功能。爲了避免終止消息提供不匹配緩衝區,可以使用probe檢查什麼樣的信息在

tag_data = 42 
tag_end = 23 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 
else: 
    for i in range(0,np.random.randint(1,10),1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0, tag=tag_data) 
    # send the termination message. Using the lower-case interface is simpler 
    comm.send(None, dest=0, tag=tag_end) 

if rank == 0: 
    # For debugging it might be better to use a list of still active procsses 
    remaining = comm.Get_size() - 1 
    while remaining > 0: 
     s = MPI.Status() 
     comm.Probe(status=s) 
     # make sure we post the right kind of message 
     if s.tag == tag_data: 
      comm.Recv(randNum, s.source, tag=tag_data) 
      print "Process ", s.source, " received the number", randNum[0] 
     elif s.tag == tag_end: 
      # don't need the result here 
      print "Process ", rank, " is done" 
      comm.recv(source=s.source, tag=tag_end) 
      remaining -= 1 

快到這方面有很多變化。例如,如果您知道消息是最後一條消息,則可以合併終止消息。

+0

目前,我在'comm.probe(status = s)'這一行失敗,錯誤爲'comm.probe(status = s)AttributeError:'mpi4py.MPI.Intracomm'對象沒有屬性'probe'' 。另外,對於您選擇的標籤號碼還是有意義的,還是僅僅需要將'tag_end'分開標識爲'tag_data',並且任何數字都可以?或者某些數字是否保留給特定的進程? – 218

+0

也許'probe'是在mpi4py的2.0.0版本中引入的。用大寫的'Probe'代替,應該沒有區別。標籤值完全是任意的,他們只需要有區別。我的例子中的實際值是對[42]的引用(https://en.wikipedia.org/wiki/42_%28number%29#Hitchhiker.27s_Guide_to_the_Galaxy)和[23](https://en.wikipedia.org/維基/ 23_%28film%29),儘管我沒有看到後者。 – Zulan

+0

看來'comm.probe'確實只在mpi4py版本2.0.0中出現。我有一個'python3'安裝使用這個和您發佈的原始代碼工作(顯然修改了'print'語句)。但是,使用舊版本的mpi4py'comm.Probe'會導致進程掛起。在print'Process'行中,rank,「done」,總是打印出rank = 0。大概用s.source替換'rank'會打印剛剛結束併發送了'tag_end'的'rank'? – 218

1

如果每個過程知道要發送的消息的數量,下面的步驟可以被設計爲解決該問題:

1)減少消息的數量將被髮送到根的過程。每個進程都會向根發送稍後將發送的消息數。此操作被稱爲還原,它可以由函數comm.reduce(...)

2)來執行接收所有工藝0

在這裏,消息是基於你應該做的伎倆的碼。它可以通過mpirun -np 4 python main.py

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
from mpi4py.MPI import ANY_SOURCE 
import numpy as np 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 

#just in case, if numpy.random is seed with 
np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank) 

if rank == 0: 
    randNum = numpy.zeros(1) 
    print "Process before receiving random numbers" 
    nb=np.empty((1,),dtype=int) 
    nb0=np.zeros((1,),dtype=int) 
    comm.Reduce([nb0, MPI.INT],[nb, MPI.INT],op=MPI.SUM, root=0) #sums the total number of random number from every process on rank 0, in nb. 
    #print "rank"+str(rank)+" nb "+str(nb) 
else: 
    nb=np.empty((1,),dtype=int) 
    nb[0]=np.random.randint(1,10) 
    #print "rank"+str(rank)+" nb "+str(nb) 
    comm.Reduce([nb, MPI.INT],None,op=MPI.SUM, root=0) 
    for i in range(0,nb[0],1): 
     randNum = numpy.zeros(1) 
     randNum = numpy.random.random_sample(1) 
     print "Process", rank, "iteration", i, "drew the number", randNum[0] 
     comm.Send(randNum, dest=0) 



if rank == 0: 
    for i in range(nb[0]): #receives nb message, each one with its int. 
     comm.Recv(randNum, ANY_SOURCE) 
     print "Process", rank, "received the number", randNum[0] 

跑出按照documentation of numpy.random() Mersenne扭曲僞隨機數生成器最初由從/dev/urandom提取的號碼(或Windows類似物)接種如果可用或種子從否則時鐘。因此,在最後一種情況下,所有進程都可以接收相同的種子並生成相同的隨機數。爲防止這種情況發生,我添加了以下行:

np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank)