2012-08-07 59 views
0

我想將兩個文件合併爲一個。 我做了兩個mappers讀取,一個reducer加入。如何在hadoop中使用JobControl

 JobConf classifiedConf = new JobConf(new Configuration()); 
      classifiedConf.setJarByClass(myjob.class); 
    classifiedConf.setJobName("classifiedjob"); 
    FileInputFormat.setInputPaths(classifiedConf,classifiedInputPath); 
    classifiedConf.setMapperClass(ClassifiedMapper.class); 
    classifiedConf.setMapOutputKeyClass(TextPair.class); 
    classifiedConf.setMapOutputValueClass(Text.class); 
    Job classifiedJob = new Job(classifiedConf); 
    //first mapper config 

    JobConf featureConf = new JobConf(new Configuration()); 
    featureConf.setJobName("featureJob"); 
      featureConf.setJarByClass(myjob.class); 
    FileInputFormat.setInputPaths(featureConf, featuresInputPath); 
    featureConf.setMapperClass(FeatureMapper.class); 
    featureConf.setMapOutputKeyClass(TextPair.class); 
    featureConf.setMapOutputValueClass(Text.class); 
    Job featureJob = new Job(featureConf); 
      //second mapper config 

    JobConf joinConf = new JobConf(new Configuration()); 
    joinConf.setJobName("joinJob"); 
      joinConf.setJarByClass(myjob.class); 
    joinConf.setReducerClass(JoinReducer.class); 
    joinConf.setOutputKeyClass(Text.class); 
    joinConf.setOutputValueClass(Text.class); 
    Job joinJob = new Job(joinConf); 
      //reducer config 
      //JobControl config 
      joinJob.addDependingJob(featureJob); 
    joinJob.addDependingJob(classifiedJob); 
    secondJob.addDependingJob(joinJob); 
    JobControl jobControl = new JobControl("jobControl"); 
    jobControl.addJob(classifiedJob); 
    jobControl.addJob(featureJob); 
    jobControl.addJob(secondJob); 

    Thread thread = new Thread(jobControl); 
    thread.start(); 
    while(jobControl.allFinished()){ 
     jobControl.stop(); 
    } 

但是,我得到這個消息: WARN mapred.JobClient:

Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 

誰能幫助請..................

回答

0

您使用的是哪個版本的Hadoop?

您收到的警告將會停止該程序?

您不需要使用setJarByClass()。你可以看到我的代碼片斷,我可以在不使用setJarByClass()方法的情況下運行它。

0

你應該實現你的工作是這樣的:

public class MyApp extends Configured implements Tool { 

    public int run(String[] args) throws Exception { 
     // Configuration processed by ToolRunner 
     Configuration conf = getConf(); 

     // Create a JobConf using the processed conf 
     JobConf job = new JobConf(conf, MyApp.class); 

     // Process custom command-line options 
     Path in = new Path(args[1]); 
     Path out = new Path(args[2]); 

     // Specify various job-specific parameters  
     job.setJobName("my-app"); 
     job.setInputPath(in); 
     job.setOutputPath(out); 
     job.setMapperClass(MyMapper.class); 
     job.setReducerClass(MyReducer.class); 

     // Submit the job, then poll for progress until the job is complete 
     JobClient.runJob(job); 
     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     // Let ToolRunner handle generic command-line options 
     int res = ToolRunner.run(new Configuration(), new MyApp(), args); 

     System.exit(res); 
    } 
} 

這都直出Hadoop的文檔here的。

所以基本上你的工作需要繼承Configured並實現Tool。這將迫使您執行run()。然後使用Toolrunner.run(<your job>, <args>)從主班級開始工作,警告將消失。

0

您需要將此代碼放入驅動程序中job.setJarByClass(MapperClassName.class);