2015-01-13 50 views
5

OpenMPI:我想讀取根節點上的文件並將該文件的內容發送給所有其他節點。 我發現MPI_Bcast做的是:使用MPI_Bcast發送動態大小的動態數組

int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, 
    int root, MPI_Comm comm) 

,我發現所有的例子都已經知道了count值,但在我的情況下,計數值主要是在根知。其他examples表示MPI_Bcast的相同調用檢索其他節點上的數據。

我已經添加了這一點:

typedef short Descriptor[128]; 
MPI_Datatype descriptorType; 
MPI_Type_contiguous(sizeof(Descriptor), MPI_SHORT, &descriptorType); 
MPI_Type_commit(&descriptorType); 



if(world_rank == 0) { 
    struct stat finfo; 

    if(stat(argv[1], &finfo) == 0) { 
     querySize = finfo.st_size/sizeof(Descriptor); 
    } 

{ 
    //read binary query 
    queryDescriptors = new Descriptor[querySize]; 
    fstream qFile(argv[1], ios::in | ios::binary); 
    qFile.read((char*)queryDescriptors, querySize*sizeof(Descriptor)); 
    qFile.close(); 

    } 
} 

    MPI_Bcast((void*)&querySize, 1, MPI_INT, 0, MPI_COMM_WORLD); 
    if (world_rank != 0) 
    { 
     queryDescriptors = new Descriptor[querySize]; 
    } 
    MPI_Bcast((void*)queryDescriptors, querySize, descriptorType, 0, MPI_COMM_WORLD); 

當我這樣稱呼它:mpirun -np 2 ./mpi_hello_world它工作正常,但是當我超過調用它,我得到這個:

mpi_hello_world: malloc.c:3096: sYSMALLOc: Assertion `(old_top == (((mbinptr) (((char *) &((av)->bins[((1) - 1) * 2])) - __builtin_offsetof (struct malloc_chunk, fd)))) && old_size == 0) || ((unsigned long) (old_size) >= (unsigned long)((((__builtin_offsetof (struct malloc_chunk, fd_nextsize))+((2 * (sizeof(size_t))) - 1)) & ~((2 * (sizeof(size_t))) - 1))) && ((old_top)->size & 0x1) && ((unsigned long)old_end & pagemask) == 0)' failed. 
mpi_hello_world: malloc.c:3096: sYSMALLOc: Assertion `(old_top == (((mbinptr) (((char *) &((av)->bins[((1) - 1) * 2])) - __builtin_offsetof (struct malloc_chunk, fd)))) && old_size == 0) || ((unsigned long) (old_size) >= (unsigned long)((((__builtin_offsetof (struct malloc_chunk, fd_nextsize))+((2 * (sizeof(size_t))) - 1)) & ~((2 * (sizeof(size_t))) - 1))) && ((old_top)->size & 0x1) && ((unsigned long)old_end & pagemask) == 0)' failed. 
+2

所以發出兩個廣播,先用計,第二與緩衝區的內容。 –

+0

你是對的,那是一個解決方案。我想知道在這種情況下MPI是否有一種機制。 – AlexandruC

+0

不是我所知道的,但是我的MPI變得有點生疏了。 –

回答

2

如果qFile.read(...)未包含在if(rank==0){}測試中,則所有進程都將讀取該文件。並且queryDescriptors = new Descriptor[querySize];應在所有進程的第一個MPI_Bcast()之後調用,除0之外:之前,querySize對這些進程沒有意義。

進程0必須:

  • 讀取的項目數
  • 分配
  • 讀陣列
  • 廣播項目
  • 廣播陣列

其他數量流程必須:

  • 接收的項目數
  • 分配
  • 接收陣列

下面是如何讀浮子的陣列,並且使用動態分配的示例:

#include <stdio.h> 
#include <iostream> 
#include <fstream> 

#include <mpi.h> 
using namespace std; 

int main (int argc, char *argv[]) 
{ 
    int rank; 
    int size; 

    MPI_Init(&argc, &argv); 

    MPI_Comm_size(MPI_COMM_WORLD, &size); 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 

    if(rank == 0) 
    { 
     //creating the file 
     ofstream myfile; 
     myfile.open ("example.txt", ios::out |ios::binary); 
     int nbitem=42; 
     myfile.write((char*)&nbitem,sizeof(int)); 

     float a=0; 
     for(int i=0;i<nbitem;i++){ 
      myfile.write((char*)&a,sizeof(float)); 
      a+=2; 
     } 
     myfile.close();  
    } 


    //now reading the file 
    int nbitemread=0; 
    float* buffer; 
    if(rank==0){ 
     ifstream file ("example.txt", ios::in |ios::binary); 
     file.read ((char*)&nbitemread, sizeof(int)); 
     buffer=new float[nbitemread]; 
     file.read ((char*)buffer,nbitemread* sizeof(float)); 
     file.close(); 
     //communication 
     MPI_Bcast(&nbitemread, 1, MPI_INT, 0, MPI_COMM_WORLD); 
     MPI_Bcast(buffer, nbitemread, MPI_FLOAT, 0, MPI_COMM_WORLD); 
    }else{ 

     MPI_Bcast(&nbitemread, 1, MPI_INT, 0, MPI_COMM_WORLD); 
     //nbitemread is meaningfull now 
     buffer=new float[nbitemread]; 
     MPI_Bcast(buffer, nbitemread, MPI_FLOAT, 0, MPI_COMM_WORLD); 

    } 

    //printing... 
    cout<<"on rank "<<rank<<" rode "<<buffer[nbitemread/2]<<" on position "<<nbitemread/2<<endl; 

    delete[] buffer; 
    MPI_Finalize(); 

    return 0; 
} 

mpiCC main.cpp -o main編譯並運行mpirun -np 2 main

您代碼中的另一個問題是MPI_Type_contiguous(sizeof(Descriptor), MPI_SHORT, &descriptorType);。它應該是MPI_Type_contiguous(sizeof(Descriptor), MPI_CHAR, &descriptorType); 這是基於你的一段代碼是應該做的伎倆:

#include <stdio.h> 
#include <iostream> 
#include <fstream> 

#include <sys/types.h> 
#include <sys/stat.h> 
#include <unistd.h> 

#include <mpi.h> 
using namespace std; 

int main (int argc, char *argv[]) 
{ 
    int world_rank; 
    int size; 

    MPI_Init(&argc, &argv); 

    MPI_Comm_size(MPI_COMM_WORLD, &size); 
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); 

    int querySize; 


    typedef short Descriptor[128]; 
    MPI_Datatype descriptorType; 
    MPI_Type_contiguous(sizeof(Descriptor), MPI_CHAR, &descriptorType); 
    MPI_Type_commit(&descriptorType); 


    Descriptor* queryDescriptors; 


    if(world_rank == 0) { 
     struct stat finfo; 

     if(stat(argv[1], &finfo) == 0) { 
      cout<<"st_size "<<finfo.st_size<<" descriptor "<<sizeof(Descriptor)<< endl; 
      querySize = finfo.st_size/sizeof(Descriptor); 
      cout<<"querySize "<<querySize<<endl; 
     }else{ 
      cout<<"stat error"<<endl; 
     } 

     { 
      //read binary query 
      queryDescriptors = new Descriptor[querySize]; 
      fstream qFile(argv[1], ios::in | ios::binary); 
      qFile.read((char*)queryDescriptors, querySize*sizeof(Descriptor)); 
      qFile.close(); 

     } 
    } 

    MPI_Bcast((void*)&querySize, 1, MPI_INT, 0, MPI_COMM_WORLD); 
    if (world_rank != 0) 
    { 
     queryDescriptors = new Descriptor[querySize]; 
    } 
    MPI_Bcast((void*)queryDescriptors, querySize, descriptorType, 0, MPI_COMM_WORLD); 

    cout<<"on rank "<<world_rank<<" rode "<<queryDescriptors[querySize/2][12]<<" on position "<<querySize/2<<endl; 

    delete[] queryDescriptors; 

    MPI_Finalize(); 

    return 0; 
} 
+0

我很抱歉沒有提到它,但我大多數人都這樣做。我使用mpiC++和mpirun -np 3主要 – AlexandruC

+0

我在我的問題中添加了更多代碼。 Vive la France! – AlexandruC

+0

我想知道這是否是一個問題,因爲我在單個節點上運行這個問題? – AlexandruC