2016-05-22 12 views
1

好吧,所以我想在樹狀結構中進行多線程深度優先搜索。我正在使用羣集中多臺計算機的線程(本例爲本地主機四核和覆盆子pi 2)。主線程應該啓動進程,並在樹的第一次分裂中,對於分裂的每個節點,它應該產生一個新線程。這些線程應該能夠將他們的發現報告給主人。MPI在Python中使用動態生成的深度優先搜索

我試圖動態地做到這一點,而不是提供mpiexec與一些線程,因爲我不知道什麼樹將事先看起來像(例如可能有2或9分裂)。

我從我正在爲這個問題工作的項目中做了一個示例,我按如下方式工作。它從一串數字中取出一位數字,併爲每個數字產生一個線索並將數字發送到該線索。

對於主機:

#!/usr/bin/python 
from mpi4py import MPI 
import datetime, sys, numpy, time 

################ Set up MPI variables ################ 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 
name = MPI.Get_processor_name() 
status = MPI.Status() 

################ Master code ################ 

script = 'cpi.py' 
for d in '34': 
    try: 
     print 'Trying to spawn child process...' 
     icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0) 
     spawnrank = icomm.Get_rank() 
     icomm.send(d, dest=spawnrank, tag=11) 
     print 'Spawned rank %d.' % spawnrank  
    except: ValueError('Spawn failed to start.') 

solved = False 
while solved == False: 
    #while not comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG): 
    # print 'spawns doing some work...' 
    # time.sleep(1) 
solved = comm.recv(source=MPI.ANY_SOURCE, tag=22) 
print 'received solution: %d' % solved 

它正確地產卵的工人,他們得到的數字,但不要把它回主。對工人的代碼如下:

工人

#!/usr/bin/python 
from mpi4py import MPI 
import datetime, sys, numpy 

################ Set up MPI variables ################ 

icomm = MPI.Comm.Get_parent() 
comm = MPI.COMM_WORLD 
irank = comm.Get_rank() 
rank = comm.Get_rank() 

running = True 
while running: 
    data = None 
    data = icomm.recv(source=0, tag=11) 
    if data: 
     print 'Trying to send %s from worker rank %d to %d' % (data, rank, irank) 
     icomm.send(data, dest=0, tag=22) 
     break 
print 'Worker on rank %d done.' % rank 
icomm.Disconnect() 

它永遠不會到達主代碼的最後一行。我還在主代碼中添加了(註釋掉)一個探測器,以檢查標記爲22的消息是否掛在某處,排除了recv函數中的錯誤,但探測器從未找到該消息。所以我認爲它永遠不會被髮送。

我想通過打印這兩個進程的排名他們都使用排名0這是有道理的,因爲它們是在同一臺計算機上產生的。但後來當我添加一個HOSTFILE和rankfile,試圖迫使它使用不同的計算機的奴隸,它給了我下面的錯誤:

[hch-K55A:06917] *** Process received signal *** 
[hch-K55A:06917] Signal: Segmentation fault (11) 
[hch-K55A:06917] Signal code: Address not mapped (1) 
[hch-K55A:06917] Failing at address: 0x3c 
[hch-K55A:06917] [ 0] /lib/x86_64-linux-gnu/libpthread.so.0(+0x10340) [0x7f2c0d864340] 
[hch-K55A:06917] [ 1] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(orte_rmaps_rank_file_lex+0x4a0) [0x7f2c0abdcb70] 
[hch-K55A:06917] [ 2] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(+0x23ac) [0x7f2c0abda3ac] 
[hch-K55A:06917] [ 3] /usr/lib/libopen-rte.so.4(orte_rmaps_base_map_job+0x2e) [0x7f2c0dacd05e] 
[hch-K55A:06917] [ 4] /usr/lib/libopen-rte.so.4(orte_plm_base_setup_job+0x5a) [0x7f2c0dac580a] 
[hch-K55A:06917] [ 5] /usr/lib/openmpi/lib/openmpi/mca_plm_rsh.so(orte_plm_rsh_launch+0x338) [0x7f2c0b80a8c8] 
[hch-K55A:06917] [ 6] /usr/lib/libopen-rte.so.4(+0x51ff4) [0x7f2c0dac3ff4] 
[hch-K55A:06917] [ 7] /usr/lib/libopen-rte.so.4(opal_event_base_loop+0x31e) [0x7f2c0dae9cfe] 
[hch-K55A:06917] [ 8] mpiexec() [0x4047d3] 
[hch-K55A:06917] [ 9] mpiexec() [0x40347d] 
[hch-K55A:06917] [10] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5) [0x7f2c0d4b0ec5] 
[hch-K55A:06917] [11] mpiexec() [0x403399] 
[hch-K55A:06917] *** End of error message *** 
Segmentation fault (core dumped) 

使用的命令:程序mpiexec -np 1 --hostfile HOSTFILE - -rankfile rankfile蟒spawntest.py

HOSTFILE: 本地主機 本地主機插槽= 1 MAX-槽= 4 PI2 @ raspi2槽= 4

Rankfile: 秩0 =本地主機插槽= 1個 秩1 = pi2 @ raspi2 slot = 1-4

所以我的問題如下;我怎樣才能在主機以外的計算機上產生這些線程,同時能夠來回發送數據?

回答

3

你的主人的代碼是非常錯誤的,我覺得你對這裏發生的事情缺乏一些概念性的理解。通過MPI_COMM_SPAWN(或其mpi4py對應comm.Spawn())催生了工作

MPI進程不會成爲父母的MPI_COMM_WORLD的一部分。產生的過程形成一個完全獨立的世界溝通者,並通過對講機與父作業相互聯繫,這正是產卵返回的結果。在你的情況下,icomm = MPI.COMM_SELF.Spawn(...)是主進程中的互通器句柄。子作業中的進程使用MPI_COMM_GET_PARENT(mpi4py中的MPI.Comm.Get_parent())獲取交互通訊器句柄。既然你是產卵單處理作業:

MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0) 
                ^^^^^^^^^^ 

只有一個在孩子作業的新成立的世界通信的過程,因此MPI.COMM_WORLD.Get_rank()回報每個工人爲零。

的你主人的這部分代碼是錯誤的,但由於intercommunicators是如何工作的它仍然功能:

spawnrank = icomm.Get_rank() # <--- not what you expect 
icomm.send(d, dest=spawnrank, tag=11) 

Intercommunicators鏈接的過程中兩個獨立的團體。其中一個叫做本地組,另一個叫做遠程組。在交際通訊器上使用MPI_COMM_RANKcomm.Get_rank())時,您可以在本地羣組中獲得呼叫過程的等級。雖然在發送或接收時,指定的等級涉及遠程組。在你的情況,產卵在以下intercommunicator一個新的工作成果:

mastet's MPI_COMM_SELF   child's MPI_COMM_WORLD 
       |        | 
+=============|================================|=============+ 
| +----------V----------+  +-------------V----------+ | 
| | group of the master |  | group of the child job | | 
| |  [ 0 ]  |  |   [ 0 ]   | | 
| +---------------------+  +------------------------+ | 
|     intercommunicator      | 
+============================================================+ 

(上面顯示的傳播者其中每組從何而來,傳播者本身並不是intercommunicator的一部分)

哪一組是本地並且這是遠程的,取決於調用進程屬於哪個組。主進程的本地組是子進程中的隊伍的遠程組,反之亦然。這裏重要的是,每個小組的等級爲0,因爲小組中至少有一個過程。你很幸運,主組只有一個進程,因此icomm.Get_rank()返回0(並且它總是返回零,因爲主控的本地組來自MPI_COMM_SELF,它始終包含一個進程),這發生在(總是)成爲遠程(兒童)組的有效排名。做正確的事情就是把消息發送到您知道遠程組中存在一個固定的等級,例如等級0

icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0) 
    icomm.send(d, dest=0, tag=11) 

(此代碼明確將排名遠程組的0,而在此之前值0只是一個幸運的巧合)

這就是說,發送部分 - 雖然不正確 - 仍然有效。接收部分沒有和有幾個原因。首先,你使用錯誤的通信器 - 從MPI_COMM_WORLD接收不起作用,因爲子進程不是它的成員。事實上,MPI中的傳播者是不可變的 - 你不能在沒有創建新的傳播者的情況下增加或刪除隊伍。你應該使用icomm來接收工人,就像你用它發送給他們一樣。現在,出現第二個問題 - 主人中的icomm被每個新的Spawn覆蓋,因此您實際上失去了與任何小孩作業通信的能力,但最後一個小孩不能通信。你需要保存一個句柄列表並附加句柄。

接收部分有點複雜。沒有MPI_ANY_COMM - 因爲他們都住在單獨的對講機中,所以你不能進行接收操作來覆蓋所有的小孩。您應該在互通話器列表上循環使用MPI_IPROBE,或者(更好地)從每個孩子開始一個非阻塞接收,然後使用MPI_WAIT_SOME(無論mpi4py等效物是什麼)。

隨着環路,主代碼應該是這個樣子(注意 - 未經測試的代碼,我沒有和/或使用mpi4py):

#!/usr/bin/python 
from mpi4py import MPI 
import datetime, sys, numpy, time 

################ Set up MPI variables ################ 

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 
size = comm.Get_size() 
name = MPI.Get_processor_name() 
status = MPI.Status() 

################ Master code ################ 

icomms = [] 
script = 'cpi.py' 
for d in '34': 
    try: 
     print 'Trying to spawn child process...' 
     icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0) 
     icomm.send(d, dest=0, tag=11) 
     icomms.append(icomm) 
     print 'Spawned a child.' 
    except: ValueError('Spawn failed to start.') 

solved = False 
while not solved and icomms: 
    for icomm in icomms: 
     if icomm.Iprobe(source=0, tag=MPI.ANY_TAG): 
      print 'A child responded...' 
      solved = icomm.recv(source=0, tag=MPI.ANY_TAG) 
      icomm.Disconnect() 
      icomms.remove(icomm) 
      if solved: break 
    if not solved: 
     print 'spawns doing some work...' 
     time.sleep(1) 
# make sure all pending sends get matched 
for icomm in icomms: 
    icomm.recv(source=0, tag=MPI.ANY_TAG) 
    icomm.Disconnect() 
print 'received solution: %d' % solved 

我希望你的想法。

此外:如果您從派生作業中產生作業,新孩子無法輕鬆建立與頂級主人的連接。爲此,您應該轉向MPI-2客戶機/服務器型號支持的一個模糊部分,並讓主機打開一個端口MPI_PORT_OPEN,然後使用MPI_PUBLISH_NAME向MPI命名服務註冊,最後使用MPI_COMM_ACCEPT接收來自任何其他MPI工作。工作人員應該使用MPI_LOOKUP_NAME獲得對該端口的引用,並使用MPI_COMM_CONNECT與主作業建立互通器。我不知道mpi4py中是否存在這些函數的包裝,如果是,那麼這些函數又是如何命名的。

+0

感謝您的精心解答,這開始變得更有意義。 – Enzime