2016-06-29 72 views
0

我遇到了「鏈接mapreduce工作」。作爲mapreduce的新手,可以解釋爲什麼應該或什麼時候應該或者在什麼情況下我們必須鏈接(我假設鏈接意味着依次運行mapreduce作業)作業?mapreduce工作鏈接

有沒有可以幫助的樣本?

感謝很多 納特

回答

0

簡單地說,你要鏈接多個映射精簡作業時,您的問題可能不適合在短短的一個映射精簡工作。

一個很好的例子是找到一個前10名購買的物品,這可以通過2周的工作來實現的:

  1. 的地圖減少工作,找到每個項目多少時間買。

  2. 第二份工作,根據購買次數對商品進行分類,並獲得前10名商品。

爲了獲得完整的想法,作業鏈生成的中間文件被寫入磁盤並從磁盤讀取,因此會降低性能。 儘量避免鏈接作業

here如何連鎖工作。

2

需要鏈接的作業的經典示例是輸出按其頻率排序的詞的詞數。

你將需要:

工作1:

  • 輸入源映射器(發光字作爲重點,一個數值)
  • 聚集減速器(聚集字計數)

工作2:

  • 鍵/值交換映射器(使頻率作爲關鍵字,字作爲值)
  • 隱式標識減速器(得到由頻率已排序的單字,不必實現)

這裏是映射器的例子/ redurs以上:

public class HadoopWordCount { 


    public static class TokenizerMapper extends Mapper<Object, Text, Text, LongWritable> { 

    private final static Text word = new Text(); 
    private final static LongWritable one = new LongWritable(1); 

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
     StringTokenizer itr = new StringTokenizer(value.toString()); 
     while (itr.hasMoreTokens()) { 
     word.set(itr.nextToken()); 
     context.write(word, one); 
     } 
    } 
    } 

    public static class KeyValueSwappingMapper extends Mapper<Text, LongWritable, LongWritable, Text> { 

    public void map(Text key, LongWritable value, Context context) throws IOException, InterruptedException { 
     context.write(value, key); 
    } 
    } 

    public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 
    private LongWritable result = new LongWritable(); 

    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, 
     InterruptedException { 
     long sum = 0; 
     for (LongWritable val : values) { 
     sum += val.get(); 
     } 
     result.set(sum); 
     context.write(key, result); 
    } 
    } 

這裏是驅動程序的例子。

它有兩個參數:

  1. 輸入文本文件來算的話。
  2. 的輸出目錄(不應該預先存在) - 尋找在這個{} DIR輸出/ OUT2 /一部分-R-0000文件

    public static void main(String[] args) throws Exception { 
    
    Configuration conf = new Configuration(); 
    Path out = new Path(args[1]); 
    
    Job job1 = Job.getInstance(conf, "word count"); 
    job1.setJarByClass(HadoopWordCount.class); 
    job1.setMapperClass(TokenizerMapper.class); 
    job1.setCombinerClass(SumReducer.class); 
    job1.setReducerClass(SumReducer.class); 
    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(LongWritable.class); 
    job1.setOutputFormatClass(SequenceFileOutputFormat.class); 
    FileInputFormat.addInputPath(job1, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job1, new Path(out, "out1")); 
    if (!job1.waitForCompletion(true)) { 
        System.exit(1); 
    } 
    Job job2 = Job.getInstance(conf, "sort by frequency"); 
    job2.setJarByClass(HadoopWordCount.class); 
    job2.setMapperClass(KeyValueSwappingMapper.class); 
    job2.setNumReduceTasks(1); 
    job2.setSortComparatorClass(LongWritable.DecreasingComparator.class); 
    job2.setOutputKeyClass(LongWritable.class); 
    job2.setOutputValueClass(Text.class); 
    job2.setInputFormatClass(SequenceFileInputFormat.class); 
    FileInputFormat.addInputPath(job2, new Path(out, "out1")); 
    FileOutputFormat.setOutputPath(job2, new Path(out, "out2")); 
    if (!job2.waitForCompletion(true)) { 
        System.exit(1); 
    } 
    
    } 
    
+0

感謝很多Yurgis。讓我通過這個樣本。親切的問候。 –