我目前有兩個hadoop作業,其中第二個作業需要將第一個作業的輸出添加到分佈式緩存。目前我手動運行它們,因此在第一個作業完成後,我將輸出文件作爲參數傳遞給第二個作業,並將其添加到緩存中。在一個驅動程序中運行相關的hadoop作業
第一份工作僅僅是一張簡單的地圖工作,我希望我可以在按順序執行兩個作業時運行一個命令。
任何人都可以幫我拿出第一份工作的輸出放入分佈式緩存的代碼,以便它可以傳遞到第二份工作?
感謝
編輯: 這是工作1當前的驅動程序:
public class PlaceDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: PlaceMapper <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Place Mapper");
job.setJarByClass(PlaceDriver.class);
job.setMapperClass(PlaceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
這是JOB2驅動程序。作業1的輸出被傳遞到工作2作爲第一個參數,並加載到高速緩存
public class LocalityDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: LocalityDriver <cache> <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Job Name Here");
DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration());
job.setNumReduceTasks(1); //TODO: Will change
job.setJarByClass(LocalityDriver.class);
job.setMapperClass(LocalityMapper.class);
job.setCombinerClass(TopReducer.class);
job.setReducerClass(TopReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[1]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
您可以在這裏編寫代碼調用兩個作業,然後人可以幫助你修改它開始。 – adranale 2012-04-25 07:19:54
好吧,我已經添加了它 – 2012-04-25 07:35:19