2013-10-29 54 views
0

我試圖用下面的代碼鏈接hadoop中的兩個順序M/R作業。基本上,在第一份工作完成之後,我做了另一份工作,將第一份工作的輸出作爲輸入。但是代碼不會爲第二個工作產生輸出,並且不會拋出任何異常。你們能幫我找出哪裏可能出錯嗎?我很感激。Hadoop M/R作業鏈無法正常工作

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args) 
      .getRemainingArgs(); 
    if (otherArgs.length != 3) { 
     System.err.println("Usage: jobStats <in> <out> <job>"); 
     System.exit(2); 
    } 


    conf.set("job", otherArgs[2]); 
    Job job = new Job(conf, "job count"); 
    job.setJarByClass(jobStats.class); 
    job.setMapperClass(jobMapper.class); 
    job.setCombinerClass(jobReducer.class); 
    job.setReducerClass(jobReducer.class); 

    job.setMapOutputKeyClass(Text.class);   
    job.setMapOutputValueClass(IntWritable.class);   
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 

    boolean completionStatus1 = job.waitForCompletion(true); 
    if (completionStatus1 == true) 
    { 
     Job job2 = new Job(conf, "job year ranking"); 
     job2.setJarByClass(jobStats.class); 
     job2.setPartitionerClass(ChainedPartitioner.class); 
     job2.setGroupingComparatorClass(CompKeyGroupingComparator.class); 
     job2.setSortComparatorClass(CompKeyComparator.class); 

     job2.setMapperClass(ChainedMapper.class); 
     job2.setReducerClass(ChainedReducer.class); 
     job2.setPartitionerClass(ChainedPartitioner.class); 
     job2.setMapOutputKeyClass(CompositeKey.class); 
     job2.setMapOutputValueClass(IntWritable.class); 
     job2.setOutputKeyClass(Text.class); 
     job2.setOutputValueClass(IntWritable.class); 

     Path outPath = new Path(otherArgs[1] + "part-r-00000"); // this is the hard-coded output of first job 
     FileSystem fs = FileSystem.get(conf); 
     if (fs.exists(outPath)) 
     { 
      FileInputFormat.addInputPath(job2, outPath); 
      FileOutputFormat.setOutputPath(job2, new Path("/user/tony/output/today")); 

      boolean completionStatus2 = job2.waitForCompletion(true); 
      if (completionStatus2 == true) 
      { 
       fs.delete(outPath, true); 
       System.exit(0); 
      } 
      else System.exit(1); 
     } 
     else System.exit(1); 
    } 
} 
+0

程序的退出代碼是什麼? –

+0

如果作業成功完成,則返回0,否則返回1 – TonyGW

+0

您需要確保最終的縮減器實際上正在寫些東西 –

回答

0

ChainedMapper和ChainedReducer類用於在一個Map Reduce作業中將多個mapper串在一起。像M1-M2-M3-R-M4-M5。

在你的情況下,你想要連續運行兩個完整的地圖縮減作業。只需爲第二份工作指定一張真實地圖即可。

+0

我的理解是ChainedMapper/Reducer對我不起作用,因爲我有兩個減速器需要被鏈接,並且ChainedMapper/Reducer類只允許一個reducer,對吧? – TonyGW

+0

這是正確的。你應該連續運行兩個工作,就像你正在做的那樣。但是你提供給OP的第二個工作的映射器是一個帶有0個組件映射器的鏈式映射器。 –

+0

我的第二個映射器就是ChainedMapper.java,這可能是與系統ChainedMapper的衝突吧? – TonyGW