2013-08-06 30 views
1

我試圖理清一系列整數是按以下順序:在MapReduce的排序產生額外的價值

A 2 
B 9 
C 4 
.... 
.... 
Z 42 

以下是映射和減速代碼:

public static class MapClass extends MapReduceBase implements Mapper<Text, Text, IntWritable, Text> 
    { 
     public void map(Text key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException 
     { 
      output.collect(new IntWritable(Integer.parseInt(value.toString())), key); 
     } 
    } 

    public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> 
    { 
     public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException 
     { 
      output.collect(key, new Text("")); 
     } 
    } 

但產量正在生產大量額外的整數。任何人都可以告訴我什麼是錯誤的代碼?

另外,如果可能的話,使用MapReduce指向一個很好的整數排序示例。

編輯:

job.setInputFormat(KeyValueTextInputFormat.class); 
job.setOutputFormat(TextOutputFormat.class); 
job.setOutputKeyClass(IntWritable.class); 
job.setOutputValueClass(Text.class); 
+0

你可以分享你的全部代碼。需要知道你使用的是什麼InputFormat,以及你的邏輯是什麼。你的數據中是否有重複的字段?你使用了多少個reducer? – Rags

+0

還要說明數據有多大? – Rags

+0

這是一個非常小的數據,最多20個整數分佈在4個文件中(每個文件5個整數)。只是想測試該程序。我編輯了源代碼以包含輸入格式。檢查。 – Jyotiska

回答

0

我按照你的邏輯,但使用新的API嘗試。結果是正確的。

注:減少(...)函數的第二個參數是**Iterable**<Text>

package stackoverflow; 

import java.io.IOException; 
import java.util.Iterator; 

import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 


public class q18076708 extends Configured implements Tool { 
    static class MapClass extends Mapper<Text, Text, IntWritable, Text> { 
     public void map(Text key, Text value, Context context) 
       throws IOException, InterruptedException { 
      context.write(new IntWritable(Integer.parseInt(value.toString())), 
        key); 
     } 

    } 

    static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text> { 
     static int xxx = -1; 
     @Override 
     public void reduce(IntWritable key, **Iterable**<Text> values, 
       Context context) throws IOException, InterruptedException { 
      context.write(key, new Text("")); 
     } 

    } 

    public int run(String[] args) throws Exception { 

     getConf().set("fs.default.name", "file:///"); 
     getConf().set("mapred.job.tracker", "local"); 
     Job job = new Job(getConf(), "Logging job"); 
     job.setJarByClass(getClass()); 

     FileInputFormat.addInputPath(job, new Path("src/test/resources/testinput.txt")); 
     FileSystem.get(getConf()).delete(new Path("target/out"), true); 
     FileOutputFormat.setOutputPath(job, new Path("target/out")); 

     job.setMapperClass(MapClass.class); 
     job.setMapOutputKeyClass(IntWritable.class); 
     job.setMapOutputValueClass(Text.class); 

     job.setCombinerClass(Reduce.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormatClass(KeyValueTextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     job.setOutputKeyClass(IntWritable.class); 
     job.setOutputValueClass(Text.class); 

     return job.waitForCompletion(true) ? 0 : 1; 
    } 

    public static void main(String[] args) throws Exception { 

     int exitCode = ToolRunner.run(new q18076708(), args); 
     System.exit(exitCode); 
    } 
} 

輸入:

A 2 
B 9 
C 4 
Z 42 

出來放:

2 
4 
9 
42