0

我有一組文件說10個文件和一個大文件,這是所有10個文件的總和。閱讀許多文件hadoop mapreduce分佈式緩存

我把它們放在分佈式緩存中,作業conf。

當我看到他們在減少,我遵守以下的事情:

  1. 我讀這是在分佈式緩存添加在減少方法只選定的文件。我預計速度會更快,因爲在每個縮減中讀取的文件大小與在所有縮小方法中讀取大型文件相比較小。但是,速度較慢。

  2. 此外,當我將它分割成更小的文件並將它們添加到分佈式緩存時,問題變得更糟。工作本身在很長一段時間纔開始運行。

我無法找到原因。請幫助。

回答

3

我認爲你的問題在於閱讀reduce()中的文件。您應該閱讀configure()(使用舊API)或setup()(使用新API)中的文件。因此,對於每一個減速將只讀取一次,而不是讀它爲每個輸入組到減速機(基本上,每次調用減少方法)

您可以編寫類似: 使用新的MapReduce API(ORG .apache.hadoop.mapreduce *) -

public static class ReduceJob extends Reducer<Text, Text, Text, Text> { 

    ... 
Path file1; 
Path file2; 
... 

    @Override 
      protected void setup(Context context) throws IOException, InterruptedException { 

       // Get the file from distributed cached 
    file1 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0]; 
    file2 = DistributedCache.getLocalCacheFiles(context.getConfiguration())[1]; 

       // parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap. 
      } 



      @Override 
      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, 
        InterruptedException { 
    ... 
    } 
    } 

使用舊的mapred API(org.apache.hadoop.mapred *) -

public static class ReduceJob extends MapReduceBase implements Reducer<Text, Text, Text, Text> { 

    ... 
Path file1; 
Path file2; 
... 

     @Override 
     public void configure(JobConf job) { 

       // Get the file from distributed cached 
    file1 = DistributedCache.getLocalCacheFiles(job)[0] 
    file2 = DistributedCache.getLocalCacheFiles(job)[1] 
... 

       // parse the file and get it's data in-memory for use in reduce method, probably in some ArrayList or HashMap. 
      } 


@Override 
     public synchronized void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, 
       Reporter reporter) throws IOException { 
    ... 
    } 
    }