2012-04-25 83 views
1

我目前有兩個hadoop作業,其中第二個作業需要將第一個作業的輸出添加到分佈式緩存。目前我手動運行它們,因此在第一個作業完成後,我將輸出文件作爲參數傳遞給第二個作業,並將其添加到緩存中。在一個驅動程序中運行相關的hadoop作業

第一份工作僅僅是一張簡單的地圖工作,我希望我可以在按順序執行兩個作業時運行一個命令。

任何人都可以幫我拿出第一份工作的輸出放入分佈式緩存的代碼,以便它可以傳遞到第二份工作?

感謝

編輯: 這是工作1當前的驅動程序:

public class PlaceDriver { 

public static void main(String[] args) throws Exception { 

    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 2) { 
     System.err.println("Usage: PlaceMapper <in> <out>"); 
     System.exit(2); 
    } 
    Job job = new Job(conf, "Place Mapper"); 
    job.setJarByClass(PlaceDriver.class); 
    job.setMapperClass(PlaceMapper.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    TextInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    TextOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

這是JOB2驅動程序。作業1的輸出被傳遞到工作2作爲第一個參數,並加載到高速緩存

public class LocalityDriver { 

public static void main(String[] args) throws Exception { 

    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 3) { 
     System.err.println("Usage: LocalityDriver <cache> <in> <out>"); 
     System.exit(2); 
    } 
    Job job = new Job(conf, "Job Name Here"); 
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration()); 
    job.setNumReduceTasks(1); //TODO: Will change 
    job.setJarByClass(LocalityDriver.class); 
    job.setMapperClass(LocalityMapper.class); 
    job.setCombinerClass(TopReducer.class); 
    job.setReducerClass(TopReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    TextInputFormat.addInputPath(job, new Path(otherArgs[1])); 
    TextOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 
+0

您可以在這裏編寫代碼調用兩個作業,然後人可以幫助你修改它開始。 – adranale 2012-04-25 07:19:54

+0

好吧,我已經添加了它 – 2012-04-25 07:35:19

回答

0

一個直接的答案將是的兩個主要方法的代碼解壓縮到例如兩個單獨的方法:boolean job1()boolean job2()並呼籲他們的主要方法後,對方這樣的:

public static void main(String[] args) throws Exception { 
    if (job1()) { 
     jobs2(); 
    } 
} 

其中job1job2調用的返回值是調用的結果job.waitForCompletion(true)

+0

然後如何將job1的輸出加載到job2的分佈式緩存中? – 2012-04-25 08:55:18

+0

據我所知,'新路徑(otherArgs [1])'被正確設置爲第一個輸出和第二個工作的輸入。 – adranale 2012-04-25 09:00:49

+0

換句話說,輸出第一個作業到臨時目錄? – 2012-04-25 09:40:43

0

MapReduce中的作業鏈很常見。您可以試試cascading,一款開源的MapReduce工作流程管理軟件。還有一些關於級聯進行的討論here。或者您可以查看與您的here類似的討論。

1

在同一主體中創建兩個作業對象。在運行另一個之前,讓第一個人等待完成。

public class DefaultTest extends Configured implements Tool{ 


    public int run(String[] args) throws Exception { 

     Job job = new Job(); 

     job.setJobName("DefaultTest-blockx15"); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(IntWritable.class); 

     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 

     job.setNumReduceTasks(15); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.setInputPaths(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.setJarByClass(DefaultTest.class); 

     job.waitForCompletion(true): 

       job2 = new Job(); 

       // define your second job with the input path defined as the output of the previous job. 


     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
     ToolRunner.run(new DefaultTest(), otherArgs); 
    } 
} 
0

您還可以使用ChainMapper,JobControl作業控制和ControlledJob來支配自己的工作流

Configuration config = getConf(); 

Job j1 = new Job(config); 
Job j2 = new Job(config); 
Job j3 = new Job(config); 

j1.waitForCompletion(true); 


JobControl jobFlow = new JobControl("j2"); 
ControlledJob cj3 = new ControlledJob(j2, null); 
jobFlow.addJob(cj3); 
jobFlow.addJob(new ControlledJob(j2, Lists.newArrayList(cj3))); 
jobFlow.addJob(new ControlledJob(j3, null)); 
jobFlow.run(); 
相關問題