2014-02-26 50 views
0

我正在討論Mapreduce模型的Hadoop框架,並且實際嘗試了像WordCount,Max_temperature這樣的基本示例,以便爲我的項目創建一個mapreduce任務。我只想知道如何處理單詞計數爲每個文件輸入一個輸出文件......在讓我給你一個例子: -多輸入文件Mapreduce Wordcount示例單獨完成

FILE_1 Dog Cat Dog Bull 
FILE_2 Cow Ox Tiger Dog Cat 
FILE_3 Dog Cow Ox Tiger Bull 

應該給3個輸出文件,1爲每個輸入文件如下: -

Out_1 Dog 2,Cat 1,Bull 1 
Out_2 Cow 1,Ox 1,Tiger 1,Dog 1,Cat 1 
Out_3 Dog 1,Cow 1,Ox 1,Tiger 1,Bull 1 

我經歷了這裏發佈的答案Hadoop MapReduce - one output file for each input但無法正確掌握。

請幫忙!感謝

+0

你無法正確把握哪部分? –

+0

若昂,如什麼是在減速功能對應的代碼,本身訪問值(從映射器)seperately爲每個輸入文件?總之,請註明該鏈接減速代碼也一樣,這將會是有益的 – khanna

+0

的可能重複的[Hadoop的MapReduce的 - 對於每個輸入的一個輸出文件(http://stackoverflow.com/questions/8886285/hadoop-mapreduce-one-output-file-for-each-input) –

回答

0

每個減速器輸出一個輸出文件。 輸出文件的數量取決於減速器的數量。 (A) 假設您想要在單個MapReduce作業中處理所有三個輸入文件。

最起碼 - 您必須將減速器的數量設置爲您想要的輸出文件的數量。

既然你正在嘗試做的字計數每個文件。而不是跨文件。 您必須確保所有文件內容(一個文件)由單個Reducer處理。使用自定義分區程序是執行此操作的一種方法。

(B) 另一種方法是簡單地運行您的MapReduce作業三次。一次輸入每個輸入文件。並有減速算作1

0

即使我在Hadoop的一個新手,發現這個問題很有意思。這就是我解決這個問題的方法。

public class Multiwordcnt { 

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { 

      Configuration conf = new Configuration(); 
      Job myJob = new Job(conf, "Multiwordcnt"); 
      String[] userargs = new GenericOptionsParser(conf, args).getRemainingArgs(); 

      myJob.setJarByClass(Multiwordcnt.class); 
      myJob.setMapperClass(MyMapper.class); 
      myJob.setReducerClass(MyReducer.class);  
      myJob.setMapOutputKeyClass(Text.class); 
      myJob.setMapOutputValueClass(IntWritable.class); 

      myJob.setOutputKeyClass(Text.class); 
      myJob.setOutputValueClass(IntWritable.class); 

      myJob.setInputFormatClass(TextInputFormat.class); 
      myJob.setOutputFormatClass(TextOutputFormat.class); 

      FileInputFormat.addInputPath(myJob, new Path(userargs[0])); 
      FileOutputFormat.setOutputPath(myJob, new Path(userargs[1])); 

      System.exit(myJob.waitForCompletion(true) ? 0 : 1); 
    } 

    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 

     Text emitkey = new Text(); 
     IntWritable emitvalue = new IntWritable(1); 

     public void map(LongWritable key , Text value, Context context) throws IOException, InterruptedException { 

      String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();      
      String line = value.toString(); 
      StringTokenizer tokenizer = new StringTokenizer(line); 
      while (tokenizer.hasMoreTokens()){ 

       String filepathword = filePathString + "*" + tokenizer.nextToken(); 
       emitkey.set(filepathword); 
       context.write(emitkey, emitvalue); 
      }   
     } 
    } 

    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
     Text emitkey = new Text(); 
     IntWritable emitvalue = new IntWritable(); 
     private MultipleOutputs<Text,IntWritable> multipleoutputs; 

     public void setup(Context context) throws IOException, InterruptedException { 
      multipleoutputs = new MultipleOutputs<Text,IntWritable>(context); 
     }   

     public void reduce(Text key , Iterable <IntWritable> values, Context context) throws IOException, InterruptedException { 
      int sum = 0; 

      for (IntWritable value : values){ 
       sum = sum + value.get(); 
      } 
      String pathandword = key.toString(); 
      String[] splitted = pathandword.split("\\*"); 
      String path = splitted[0]; 
      String word = splitted[1];    
      emitkey.set(word); 
      emitvalue.set(sum); 
      System.out.println("word:" + word + "\t" + "sum:" + sum + "\t" + "path: " + path); 
      multipleoutputs.write(emitkey,emitvalue , path); 
     } 

     public void cleanup(Context context) throws IOException, InterruptedException { 
      multipleoutputs.close(); 
     } 
    } 
}