2012-10-01 74 views
5

全部沒有調用Hadoop減速器

我有簡單的map/reduce實現。 Mapper被調用,它完成它的工作,但是reducer永遠不會被調用。

這裏是映射:

static public class InteractionMap extends Mapper<LongWritable, Text, Text, InteractionWritable> { 

    @Override 
    protected void map(LongWritable offset, Text text, Context context) throws IOException, InterruptedException { 
     System.out.println("mapper"); 
     String[] tokens = text.toString().split(","); 
     for (int idx = 0; idx < tokens.length; idx++) { 
      String sourceUser = tokens[1]; 
      String targetUser = tokens[2]; 
      int points = Integer.parseInt(tokens[4]); 
      context.write(new Text(sourceUser), new InteractionWritable(targetUser, points)); 
      } 
     } 
    } 
} 

這裏是我的減速器:

static public class InteractionReduce extends Reducer<Text, InteractionWritable, Text, Text> { 

    @Override 
    protected void reduce(Text token, Iterable<InteractionWritable> counts, Context context) throws IOException, InterruptedException { 
     System.out.println("REDUCER"); 
     Iterator<InteractionWritable> i = counts.iterator(); 
     while (i.hasNext()) { 
      InteractionWritable interaction = i.next(); 
      context.write(token, new Text(token.toString() + " " + interaction.getTargetUser().toString() + " " + interaction.getPoints().get())); 
     } 
    } 

} 

而且,這裏是配置部分:

@Override 
public int run(String[] args) throws Exception { 
    Configuration configuration = getConf(); 
    Job job = new Job(configuration, "Interaction Count"); 
    job.setJarByClass(InteractionMapReduce.class); 
    job.setMapperClass(InteractionMap.class); 
    job.setCombinerClass(InteractionReduce.class); 
    job.setReducerClass(InteractionReduce.class); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    return job.waitForCompletion(true) ? 0 : -1; 
} 

沒有人有任何想法,爲什麼減速不被調用?

+0

你能分享你的計數器的地圖,合併器和減速器輸入/輸出計數? –

回答

6

好吧,這是我的錯,正如所料。作業配置不好。 這是應該的樣子:

Configuration configuration = getConf(); 

Job job = new Job(configuration, "Interaction Count"); 
job.setJarByClass(InteractionMapReduce.class); 
job.setMapperClass(InteractionMap.class); 
job.setReducerClass(InteractionReduce.class); 
job.setMapOutputKeyClass(Text.class); 
job.setMapOutputValueClass(InteractionWritable.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class); 

FileInputFormat.addInputPath(job, new Path(args[0])); 
FileOutputFormat.setOutputPath(job, new Path(args[1])); 

return job.waitForCompletion(true) ? 0 : -1; 

問題的發生是因爲Map和Reduce階段有不同的輸出類型。在調用context.write方法之後,作業無聲無息地失敗。所以,我不得不補充的是這些行:

job.setMapOutputKeyClass(Text.class); 
job.setMapOutputValueClass(InteractionWritable.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class); 
0
  1. 我希望Mapper方法中的text有一些數據。
  2. 您真的需要ReducerCombiner以及Reducer

我總是有一個主類InteractionMapReduce和裏面我有InteractionMapInteractionReduce類。

所以在設置MapperReducer類的工作,我把他們像InteractionMapReduce.InteractionMap.classInteractionMapReduce.InteractionReduce.class

我不知道這是否會幫助你,但你可以嘗試。

+0

1.它。輸入數據大約有1000行csv文本。 2.我不這樣做,但將它作爲組合器移除並不能解決問題。 – ezamur

+0

增加了一個點。 – JHS

+0

看到它。減速器簽名是好的,我用api文檔檢查它。 – ezamur