2012-09-11 126 views
9

我需要鏈接兩個MapReduce作業。我使用JobControl將job2設置爲job1的依賴項。 它的作品,輸出文件被創建!但它不會停止! 在shell它保持這種狀態:(Hadoop)MapReduce - 鏈作業 - JobControl不停止

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1 
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded 
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1 

我怎麼能阻止它? 這是我的主要。

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Configuration conf2 = new Configuration(); 

    Job job1 = new Job(conf, "canzoni"); 
    job1.setJarByClass(CanzoniOrdinate.class); 
    job1.setMapperClass(CanzoniMapper.class); 
    job1.setReducerClass(CanzoniReducer.class); 
    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob1 = new ControlledJob(conf); 
    cJob1.setJob(job1); 
    FileInputFormat.addInputPath(job1, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp")); 


    Job job2 = new Job(conf2, "songsort"); 
    job2.setJarByClass(CanzoniOrdinate.class); 
    job2.setMapperClass(CanzoniSorterMapper.class); 
    job2.setSortComparatorClass(ReverseOrder.class); 
    job2.setInputFormatClass(KeyValueTextInputFormat.class); 
    job2.setReducerClass(CanzoniSorterReducer.class); 
    job2.setMapOutputKeyClass(IntWritable.class); 
    job2.setMapOutputValueClass(Text.class); 
    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob2 = new ControlledJob(conf2); 
    cJob2.setJob(job2); 
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*")); 
    FileOutputFormat.setOutputPath(job2, new Path(args[1])); 

    JobControl jobctrl = new JobControl("jobctrl"); 
    jobctrl.addJob(cJob1); 
    jobctrl.addJob(cJob2); 
    cJob2.addDependingJob(cJob1); 
    jobctrl.run(); 


    //////////////// 
    // NEW CODE /// 
    ////////////// 


    // delete jobctrl.run(); 
    Thread t = new Thread(jobctrl); 
    t.start(); 
    String oldStatusJ1 = null; 
    String oldStatusJ2 = null; 
    while (!jobctrl.allFinished()) { 
     String status =cJob1.toString(); 
     String status2 =cJob2.toString(); 
     if (!status.equals(oldStatusJ1)) { 
     System.out.println(status); 
     oldStatusJ1 = status; 
     } 
     if (!status2.equals(oldStatusJ2)) { 
     System.out.println(status2); 
     oldStatusJ2 = status2; 
     }  
    } 
    System.exit(0); 

}}

+1

我解決它使用一個線程來啓動JobControl作業控制。我檢查了使用while循環完成的工作:while(!jobctrl.allFinished())和System.exit()退出循環。 現在我希望作業返回信息消息,我獲得的所有信息都是通過ControlledJob.toString()知道哪個作業正在運行。我不知道如何獲取信息消息:映射器任務的數量,減少任務的數量,輸入或輸出中的記錄等......想要獲得這些消息的任何想法? –

+0

「job.getCounters()。toString()」足夠嗎? – zsxwing

+0

這是JobControl類中的錯誤嗎? – Rags

回答

5

我基本上是做了什麼,彼得上面提到的。

public class JobRunner implements Runnable { 
    private JobControl control; 

    public JobRunner(JobControl _control) { 
    this.control = _control; 
    } 

    public void run() { 
    this.control.run(); 
    } 
} 

,並在我的map/reduce I類有:

public void handleRun(JobControl control) throws InterruptedException { 
    JobRunner runner = new JobRunner(control); 
    Thread t = new Thread(runner); 
    t.start(); 

    while (!control.allFinished()) { 
     System.out.println("Still running..."); 
     Thread.sleep(5000); 
    } 
} 

中,我傳遞的JobControl作業控制對象。

+2

+1提供了一個工作示例 – beterthanlife

3

的JobControl作業控制對象本身是運行的,所以你可以使用它像這樣:

new Thread(myJobControlInstance).start() 
0

只是一個調整,以共享了什麼sinemetu1的代碼片段..

可以刪除調用當用戶確認JobControl作業控制只能通過新的線程中運行JobRunner爲JobControl作業控制本身實現Runnable

 Thread thread = new Thread(jobControl); 
     thread.start(); 

     while (!jobControl.allFinished()) { 
      System.out.println("Still running..."); 
      Thread.sleep(5000); 
     } 

我也是偶然發現了這個鏈接。 https://www.mail-archive.com/[email protected]/msg00556.html

0

試試這個:

Thread jcThread = new Thread(jobControl); 
    jcThread.start(); 
    System.out.println("循環判斷jobControl運行狀態 >>>>>>>>>>>>>>>>"); 
    while (true) { 
     if (jobControl.allFinished()) { 
     System.out.println("====>> jobControl.allFinished=" + jobControl.getSuccessfulJobList()); 
     jobControl.stop(); 
     // 如果不加 break 或者 return,程序會一直循環 
     break; 
    } 

    if (jobControl.getFailedJobList().size() > 0) { 
     succ = 0; 
     System.out.println("====>> jobControl.getFailedJobList=" + jobControl.getFailedJobList()); 
     jobControl.stop(); 

     // 如果不加 break 或者 return,程序會一直循環 
     break; 
    } 
}