2016-03-16 99 views
0

我有使用comm.Scattervcomm.Gatherv以下MWE以在給定數量的核(size沿着什麼軸mpi4py Scatterv函數分割一個numpy數組?

import numpy as np 
from mpi4py import MPI 
import matplotlib.pyplot as plt 

comm = MPI.COMM_WORLD 
size = comm.Get_size() 
rank = comm.Get_rank() 

if rank == 0: 
    test = np.random.rand(411,48,52,40) #Create array of random numbers 
    outputData = np.zeros(np.shape(test)) 
    split = np.array_split(test,size,axis = 0) #Split input array by the number of available cores 

    split_sizes = [] 

    for i in range(0,len(split),1): 
     split_sizes = np.append(split_sizes, len(split[i])) 

    displacements = np.insert(np.cumsum(split_sizes),0,0)[0:-1] 

    plt.imshow(test[0,0,:,:]) 
    plt.show() 

else: 
#Create variables on other cores 
    split_sizes = None 
    displacements = None 
    split = None 
    test = None 
    outputData = None 

#Broadcast variables to other cores 
test = comm.bcast(test, root = 0) 
split = comm.bcast(split, root=0) 
split_sizes = comm.bcast(split_sizes, root = 0) 
displacements = comm.bcast(displacements, root = 0) 

output_chunk = np.zeros(np.shape(split[rank])) #Create array to receive subset of data on each core, where rank specifies the core 
print("Rank %d with output_chunk shape %s" %(rank,output_chunk.shape)) 

comm.Scatterv([test,split_sizes, displacements,MPI.DOUBLE],output_chunk,root=0) #Scatter data from test across cores and receive in output_chunk 

output = output_chunk 

plt.imshow(output_chunk[0,0,:,:]) 
plt.show() 

print("Output shape %s for rank %d" %(output.shape,rank)) 

comm.Barrier() 

comm.Gatherv(output,[outputData,split_sizes,displacements,MPI.DOUBLE], root=0) #Gather output data together 

if rank == 0: 
    print("Final data shape %s" %(outputData.shape,)) 
    plt.imshow(outputData[0,0,:,:]) 
    plt.show() 

這在原則隨機數和四維陣列之前,應該把它跨越size核心分配一個四維陣列重組。我期望Scatterv根據向量split_sizesdisplacements中的起始整數和位移沿着軸0(長度411)進行劃分。但是,在與Gathervmpi4py.MPI.Exception: MPI_ERR_TRUNCATE: message truncated)重新組合時出現錯誤,並且每個內核上的output_chunk圖表顯示大多數輸入數據已丟失,因此看起來沿着第一個軸沒有發生拆分。

我的問題是:爲什麼不沿着第一個軸發生分裂,我怎麼知道分裂沿哪個軸發生,並且是否可以改變/指定哪個軸出現這種情況?

+0

'comm.Scatterv'可能對'numpy'陣列,形狀,尺寸或步幅一無所知。它最多可以將'test'作爲一塊內存。事實上,它可能只是獲取指向數組對象的指針,而不是其數據緩衝區。這段代碼是否適用於1d數組?或'test.flatten()'? – hpaulj

回答

2

comm.Scattervcomm.Gatherv對numpy數組維度一無所知。他們只是將sendbuf視爲一塊內存。因此,在指定sendcountsdisplacements(有關詳細信息,請參閱http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html)時,有必要考慮這一點。這個假設也是數據在內存中以C風格(主要行)佈局。

下面給出了一個二維矩陣的例子。該代碼的關鍵部分是正確設置split_sizes_input/split_sizes_outputdisplacements_input/displacements_output。該代碼以第二尺寸大小考慮在內,以指定在所述存儲器塊中的正確區劃:

split_sizes_input = split_sizes*512 

對於更高的尺寸,這條線將被更改爲:

split_sizes_input = split_sizes*indirect_dimension_sizes 

其中

indirect_dimension_sizes = npts2*npts3*npts4*....*nptsN

split_sizes_output也是如此。

該代碼創建一個2D數組,數字1到512在一個維度上遞增。從圖中很容易看出數據是否已被正確拆分和重組。

import numpy as np 
from mpi4py import MPI 
import matplotlib.pyplot as plt 

comm = MPI.COMM_WORLD 
size = comm.Get_size() 
rank = comm.Get_rank() 

if rank == 0: 
    test = np.arange(0,512,dtype='float64') 
    test = np.tile(test,[256,1]) #Create 2D input array. Numbers 1 to 512 increment across dimension 2. 
    outputData = np.zeros([256,512]) #Create output array of same size 
    split = np.array_split(test,size,axis = 0) #Split input array by the number of available cores 

    split_sizes = [] 

    for i in range(0,len(split),1): 
     split_sizes = np.append(split_sizes, len(split[i])) 

    split_sizes_input = split_sizes*512 
    displacements_input = np.insert(np.cumsum(split_sizes_input),0,0)[0:-1] 

    split_sizes_output = split_sizes*512 
    displacements_output = np.insert(np.cumsum(split_sizes_output),0,0)[0:-1] 


    print("Input data split into vectors of sizes %s" %split_sizes_input) 
    print("Input data split with displacements of %s" %displacements_input) 

    plt.imshow(test) 
    plt.colorbar() 
    plt.title('Input data') 
    plt.show() 

else: 
#Create variables on other cores 
    split_sizes_input = None 
    displacements_input = None 
    split_sizes_output = None 
    displacements_output = None 
    split = None 
    test = None 
    outputData = None 

split = comm.bcast(split, root=0) #Broadcast split array to other cores 
split_sizes = comm.bcast(split_sizes_input, root = 0) 
displacements = comm.bcast(displacements_input, root = 0) 
split_sizes_output = comm.bcast(split_sizes_output, root = 0) 
displacements_output = comm.bcast(displacements_output, root = 0) 

output_chunk = np.zeros(np.shape(split[rank])) #Create array to receive subset of data on each core, where rank specifies the core 
print("Rank %d with output_chunk shape %s" %(rank,output_chunk.shape)) 
comm.Scatterv([test,split_sizes_input, displacements_input,MPI.DOUBLE],output_chunk,root=0) 

output = np.zeros([len(output_chunk),512]) #Create output array on each core 

for i in range(0,np.shape(output_chunk)[0],1): 
    output[i,0:512] = output_chunk[i] 

plt.imshow(output) 
plt.title("Output shape %s for rank %d" %(output.shape,rank)) 
plt.colorbar() 
plt.show() 

print("Output shape %s for rank %d" %(output.shape,rank)) 

comm.Barrier() 

comm.Gatherv(output,[outputData,split_sizes_output,displacements_output,MPI.DOUBLE], root=0) #Gather output data together 



if rank == 0: 
    outputData = outputData[0:len(test),:] 
    print("Final data shape %s" %(outputData.shape,)) 
    plt.imshow(outputData) 
    plt.colorbar() 
    plt.show() 
    print(outputData)