2016-01-22 26 views
0
import java.io.*; 

import org.apache.hadoop.conf.*; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.mapreduce.lib.input.*; 
import org.apache.hadoop.mapreduce.lib.output.*; 
import org.apache.hadoop.mapreduce.lib.partition.*; 
import org.apache.hadoop.mapreduce.lib.reduce.*; 
import org.apache.hadoop.util.*; 

/** 
* Demonstrates how to use Total Order Partitioner on Word Count. 
*/ 
public class TotalOrderPartitionerExample { 
public static class WordCount extends Configured implements Tool { 
private final static int REDUCE_TASKS = 8; 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new WordCount(), args); 
     System.exit(exitCode); 
    } 

    @Override @SuppressWarnings({ "unchecked", "rawtypes" }) 
    public int run(String[] args) throws Exception { 
     // Check arguments. 
     if (args.length != 2) { 
     String usage = 
      "Usage: " + 
      "hadoop jar TotalOrderPartitionerExample$WordCount " + 
      "<input dir> <output dir>\n" 
     System.out.printf(usage); 
     System.exit(-1); 
     } 

     String jobName = "WordCount"; 
     String mapJobName = jobName + "-Map"; 
     String reduceJobName = jobName + "-Reduce"; 

     // Get user args. 
     String inputDir = args[0]; 
     String outputDir = args[1]; 

     // Define input path and output path. 
     Path mapInputPath = new Path(inputDir); 
     Path mapOutputPath = new Path(outputDir + "-inter"); 
     Path reduceOutputPath = new Path(outputDir); 

     // Define partition file path. 
     Path partitionPath = new Path(outputDir + "-part.lst"); 

     // Configure map-only job for sampling. 
     Job mapJob = new Job(getConf()); 
     mapJob.setJobName(mapJobName); 
     mapJob.setJarByClass(WordCount.class); 
     mapJob.setMapperClass(WordMapper.class); 
     mapJob.setNumReduceTasks(0); 
     mapJob.setOutputKeyClass(Text.class); 
     mapJob.setOutputValueClass(IntWritable.class); 
     TextInputFormat.setInputPaths(mapJob, mapInputPath); 

     // Set the output format to a sequence file. 
     mapJob.setOutputFormatClass(SequenceFileOutputFormat.class); 
     SequenceFileOutputFormat.setOutputPath(mapJob, mapOutputPath); 

     // Submit the map-only job. 
     int exitCode = mapJob.waitForCompletion(true) ? 0 : 1; 
     if (exitCode != 0) { return exitCode; } 

     // Set up the second job, the reduce-only. 
     Job reduceJob = new Job(getConf()); 
     reduceJob.setJobName(reduceJobName); 
     reduceJob.setJarByClass(WordCount.class); 

     // Set the input to the previous job's output. 
     reduceJob.setInputFormatClass(SequenceFileInputFormat.class); 
     SequenceFileInputFormat.setInputPaths(reduceJob, mapOutputPath); 

     // Set the output path to the final output path. 
     TextOutputFormat.setOutputPath(reduceJob, reduceOutputPath); 

     // Use identity mapper for key/value pairs in SequenceFile. 
     reduceJob.setReducerClass(IntSumReducer.class); 
     reduceJob.setMapOutputKeyClass(Text.class); 
     reduceJob.setMapOutputValueClass(IntWritable.class); 
     reduceJob.setOutputKeyClass(Text.class); 
     reduceJob.setOutputValueClass(IntWritable.class); 
     reduceJob.setNumReduceTasks(REDUCE_TASKS); 

     // Use Total Order Partitioner. 
     reduceJob.setPartitionerClass(TotalOrderPartitioner.class); 

     // Generate partition file from map-only job's output. 
     TotalOrderPartitioner.setPartitionFile(
      reduceJob.getConfiguration(), partitionPath); 
     InputSampler.writePartitionFile(reduceJob, new InputSampler.RandomSampler(
      1, 10000)); 

     // Submit the reduce job. 
     return reduceJob.waitForCompletion(true) ? 0 : 2; 
    } 
    } 

    public static class WordMapper extends 
     Mapper<LongWritable, Text, Text, IntWritable> { 
    @Override 
    public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException { 
     String line = value.toString(); 
     for (String word : line.split("\\W+")) { 
     if (word.length() == 0) { continue; } 
     context.write(new Text(word), new IntWritable(1)); 
     } 
    } 
    } 

} 

我從GitHub的代碼。 我比較了地圖的縮短時間和縮短時間。 定期wordcount比總定單執行更好的工作表現。 這是爲什麼? 滿足平均性能所需的任何優化或更改? Hashpartitioner性能與TotalOrderPartitioner性能?Hadoop的總訂單分區程序

+0

爲什麼要比較wordcount的運行時間和使用特定分區程序的運行時間?我沒有看到你做出的比較... – vefthym

+0

由於總分區器輸出在所有減速器中映射鍵。我想檢查兩個分區器的減少時間。總訂單劃分器增加了嗎? – user2738965

回答

0

是,HashPartitioner將比TotalOrderPartitioner有更好的表現,因爲HashPartitioner沒有開銷或運行InputSampler和寫入分區的文件等等,當你需要一個全局有序輸出,將是

TotalOrderPartitioner僅用於比HashPartitioner慢。