2015-09-05 60 views
2

我有一個MPI程序,用於從包含文件名列表的文件中讀取多個進程,並基於讀取的文件名 - 它讀取相應的文件並計數單詞的頻率。當一個MPI進程執行MPI_Barrier()時,其他進程掛起

如果其中一個進程完成並返回 - 阻止執行MPI_Barrier(),則其他進程也會掛起。在調試時,可以看到函數沒有輸入process_files() readFile()函數無法找出原因。請看以下代碼:

#include <stdio.h> 
#include <stdlib.h> 
#include <mpi.h> 
#include <ctype.h> 
#include <string.h> 
#include "hash.h" 

void process_files(char*, int* , int, hashtable_t*); 

void initialize_word(char *c,int size) 
{ 
    int i; 
    for(i=0;i<size;i++) 
     c[i]=0; 

    return; 
} 



char* readFilesList(MPI_File fh, char* file,int rank, int nprocs, char* block, const int overlap, int* length) 
{ 
    char *text; 
    int blockstart,blockend; 

    MPI_Offset size; 
    MPI_Offset blocksize; 
    MPI_Offset begin; 
    MPI_Offset end; 
    MPI_Status status; 

    MPI_File_open(MPI_COMM_WORLD,file,MPI_MODE_RDONLY,MPI_INFO_NULL,&fh); 
    MPI_File_get_size(fh,&size); 

    /*Block size calculation*/ 
    blocksize = size/nprocs; 
    begin = rank*blocksize; 
    end = begin+blocksize-1; 

    end+=overlap; 

    if(rank==nprocs-1) 
     end = size; 

    blocksize = end-begin+1; 

    text = (char*)malloc((blocksize+1)*sizeof(char)); 
    MPI_File_read_at_all(fh,begin,text,blocksize,MPI_CHAR, &status); 
    text[blocksize+1]=0; 

    blockstart = 0; 
    blockend = blocksize; 

    if(rank!=0) 
    { 
     while(text[blockstart]!='\n' && blockstart!=blockend) blockstart++; 
     blockstart++; 
    } 

    if(rank!=nprocs-1) 
    { 

     blockend-=overlap; 
     while(text[blockend]!='\n'&& blockend!=blocksize) blockend++; 
    } 



    blocksize = blockend-blockstart; 

    block = (char*)malloc((blocksize+1)*sizeof(char)); 
    block = memcpy(block, text + blockstart, blocksize); 
    block[blocksize]=0; 
    *length = strlen(block); 

    MPI_File_close(&fh); 
    return block; 
} 

void calculate_term_frequencies(char* file, char* text, hashtable_t *hashtable,int rank) 
{ 
    printf("Start File %s, rank %d \n\n ",file,rank); 
    fflush(stdout); 
    if(strlen(text)!=0||strlen(file)!=0) 
    { 

     int i,j; 
     char w[100]; 
     i=0,j=0; 
     while(text[i]!=0) 
     { 
      if((text[i]>=65&&text[i]<=90)||(text[i]>=97&&text[i]<=122)) 
      { 
       w[j]=text[i]; 
       j++; i++; 
      } 

      else 
      { 

       w[j] = 0; 
       if(j!=0) 
       { 
        //ht_set(hashtable, strcat(strcat(w,"#"),file),1); 
       } 
       j=0; 
       i++; 
       initialize_word(w,100); 
      } 

     } 
    } 
    return; 
} 

void readFile(char* filename, hashtable_t *hashtable,int rank) 
{ 
    MPI_Status stat; 
    MPI_Offset size; 
    MPI_File fx; 
    char* textFromFile=0; 

    printf("Start File %d, rank %d \n\n ",strlen(filename),rank); 
    fflush(stdout); 

    if(strlen(filename)!=0) 
    { 
     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_RDONLY,MPI_INFO_NULL,&fx); 
     MPI_File_get_size(fx,&size); 

     printf("Start File %s, rank %d \n\n ",filename,rank); 
     fflush(stdout); 

     textFromFile = (char*)malloc((size+1)*sizeof(char)); 
     MPI_File_read_at_all(fx,0,textFromFile,size,MPI_CHAR, &stat); 
     textFromFile[size]=0; 
     calculate_term_frequencies(filename, textFromFile, hashtable,rank); 

     MPI_File_close(&fx); 

    } 

    printf("Done File %s, rank %d \n\n ",filename,rank); 
    fflush(stdout); 
    return; 
} 

void process_files(char* block, int* length, int rank,hashtable_t *hashtable) 
{ 

    char s[2]; 
    s[0] = '\n'; 
    s[1] = 0; 

    char *file; 
    if(*length!=0) 
    { 
     /* get the first file */ 
     file = strtok(block, s); 

     /* walk through other tokens */ 
     while(file != NULL) 
     { 
      readFile(file,hashtable,rank); 
      file = strtok(NULL, s); 
     } 
    } 
    return; 
} 

void execute_process(MPI_File fh, char* file, int rank, int nprocs, char* block, const int overlap, int * length, hashtable_t *hashtable) 
{ 

    block = readFilesList(fh,file,rank,nprocs,block,overlap,length); 
    process_files(block,length,rank,hashtable); 
} 


int main(int argc, char *argv[]){ 

    /*Initialization*/ 
    MPI_Init(&argc, &argv); 
    MPI_File fh=0; 
    int rank,nprocs,namelen; 
    char *block=0; 
    const int overlap = 70; 
    char* file = "filepaths.txt"; 
    int *length = (int*)malloc(sizeof(int)); 

    hashtable_t *hashtable = ht_create(65536); 

    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs); 

    char processor_name[MPI_MAX_PROCESSOR_NAME]; 
    MPI_Get_processor_name(processor_name, &namelen); 
    printf("Rank %d is on processor %s\n",rank,processor_name); 
    fflush(stdout); 

    execute_process(fh,file,rank,nprocs,block,overlap,length,hashtable); 

    printf("Rank %d returned after processing\n",rank); 
    MPI_Barrier(MPI_COMM_WORLD); 

    MPI_Finalize(); 
    return 0; 

} 

的filepaths.txt是包含普通文本文件的絕對文件名的文件:

如:

/home/mpiuser/mpi/MPI_Codes/code/test1.txt 
/home/mpiuser/mpi/MPI_Codes/code/test2.txt 
/home/mpiuser/mpi/MPI_Codes/code/test3.txt 
+0

這個readFilesList看起來相當複雜,你確定它在那裏生成正確的塊大小嗎?我認爲你不會從並行化這部分代碼中獲得很多收益。讀取單個文本文件(與實際數據相比,該文件相對較小,您希望從這些文件讀取)在單個進程中更容易完成,甚至可能更快。因此,我會在單個進程中讀取該列表並進行廣播或分散結果列表的文件。 – haraldkl

+0

在我看來,隨後你繼續讓每個進程讀取其中一個文件,而不是所有的讀取部分。如果是這種情況,您不要在這裏使用MPI_IO! MPI_read_all操作要求所有進程參與此文件的調用。 – haraldkl

+1

readFilesList正在生成文件列表的非重疊塊。但是,我會嘗試單個進程的建議來讀取它,並使用scatter將它們分配給進程。 – Dhanashree

回答

0

你readFilesList功能是非常令人迷惑,我相信它沒有做你想做的事情,但也許我不明白它的正確性。我相信它應該從每個進程的列表文件中收集一大堆文件名。每個過程都有不同的設置。它並沒有這樣做,但這不是問題,即使這樣做會達到你想要的效果,後續的MPI IO也無法工作。

讀取文件時,使用MPI_File_read_all和MPI_COMM_WORLD作爲通信器。這需要所有進程參與閱讀這個文件。現在,如果每個進程都應該讀取不同的文件,這顯然不會起作用。

所以你的實現有幾個問題,但我不能真正解釋你描述的行爲,我寧願先開始並嘗試修復它們,然後再詳細調試,發生什麼問題。

我的印象,你想有沿着這些線路的算法:

  • 讀取文件名列表

  • 分發文件,這些文件列表同樣給所有進程

  • 讓每個進程在其自己的一組文件上工作

  • 使用來自thi的加工

而且我建議用下面的方法來試試這個:

  • 閱讀單個進程的列表(無MPI IO)

  • 分散的文件列表到所有進程,以便所有工作都可以解決相同數量的工作

  • 讓每個進程獨立並以串行方式工作在其文件列表上(串行文件根據需要

我相信訪問和處理)

  • 與MPI一些數據減少,這將是您的情況最好的(最簡單和最快)的策略。請注意,根本不涉及MPI IO。我不認爲在第一步中對文件列表進行一些複雜的分佈式讀取會導致這方面的任何優勢,並且在實際的處理過程中它實際上是有害的。您的流程越獨立,通常您的可擴展性越好。

  • +0

    我想我已經在一定程度上誤解了MPI-IO,並將其用於完全獨立的文件處理。我將readFiles()中使用MPI-IO的代碼部分改爲串行文件訪問,並且工作正常。感謝您的詳細解釋。 – Dhanashree

    +0

    MPi-IO方法仍然可以與MPI_COMM_SELF一起使用。例如,您不再獲得集體I/O優化,但您確實在程序和底層文件系統之間獲得了抽象層 - 例如,您可能希望在Windows和Unix之間進行移植。 –