wcin_file的內容:的MapReduce:結果是不完整的
Run 1
access 1
default 2
out 2
project 1
task 1
windows 1
your 1
我想使用的MapReduce來遞減第二FILD,文件這些數據wcin_file排序只是如下:
default 2
out 2
access 1
...
但我發現輸出文件只包含兩行:
default 2
Run 1
爲什麼?下面是一些源代碼:
SortLogsMapper
public static class SortLogsMapper extends
Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, new IntWritable(0)); //the content of value is just every line, just as `Run 1`, `access 1` etc.
}
}
SortLogsReducer
public static class SortLogsReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private Text k = new Text();
private IntWritable v = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
k.set(key.toString().split(" ")[0]); //split to get the first filed
v.set(Integer.parseInt(key.toString().split(" ")[1])); //second filed
context.write(k, v);
}
}
LogDescComparator
public static class LogDescComparator extends WritableComparator {
protected LogDescComparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
Text t1 = (Text) w1;
Text t2 = (Text) w2;
String[] t1Items = t1.toString().split("\t| ");
String[] t2Items = t2.toString().split("\t| ");
Integer t1Value = Integer.parseInt(t1Items[1]);
Integer t2Value = Integer.parseInt(t2Items[1]);
int comp = t2Value.compareTo(t1Value);
return comp;
然後,我開始在主函數的工作:
Job job2 = new Job(conf2, "sort");
job2.setNumReduceTasks(1);
job2.setJarByClass(WordCount.class);
job2.setMapperClass(SortLogsMapper.class);
job2.setReducerClass(SortLogsReducer.class);
job2.setSortComparatorClass(LogDescComparator.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job2, new Path("wcin_file"));
FileOutputFormat.setOutputPath(job2, new Path("wcout"));
System.exit(job2.waitForCompletion(true) ? 0 : 1);