2010-03-23 162 views
110

在應用MapReduce的許多實際情況中,最終的算法最終都是幾個MapReduce步驟。在Hadoop中鏈接多個MapReduce作業

即Map1,Reduce1,Map2,Reduce2等等。

因此,您需要將最後一次減少的輸出作爲下一個地圖的輸入。

中間數據是您(一般情況下)在管道成功完成後不想保留的內容。另外,因爲這個中間數據通常是一些數據結構(如'map'或'set'),所以您不希望在編寫和讀取這些鍵值對時付出太多努力。

在Hadoop中這樣做的推薦方式是什麼?

是否有一個(簡單)示例說明如何以正確的方式處理這些中間數據,包括之後的清理?

+2

使用其MapReduce框架? – skaffman 2010-03-23 12:03:03

+1

我編輯了這個問題來澄清我正在談論的Hadoop。 – 2010-03-23 13:11:05

+0

我推薦這個swineherd gem:https://github.com/Ganglion/swineherd best,Tobias – Tobias 2011-06-15 09:33:36

回答

52

我覺得這個教程雅虎開發者網絡將幫助你這樣的:Chaining Jobs

您使用JobClient.runJob()。來自第一份工作的數據的輸出路徑成爲您的第二份工作的輸入路徑。這些需要作爲參數傳遞給您的作業,並使用適當的代碼來解析它們併爲作業設置參數。

我認爲上述方法可能是現在較老的mapred API的做法,但它應該仍然有效。在新的mapreduce API中會有類似的方法,但我不確定它是什麼。

只要在作業完成後刪除中間數據,您可以在代碼中執行此操作。我之前已經做了它的方式是使用類似:

FileSystem.delete(Path f, boolean recursive); 

所在路徑是數據的HDFS的位置。您需要確保只有在沒有其他工作需要時才刪除此數據。

+2

感謝您的雅虎教程鏈接。鏈接工作確實是你想要的,如果兩者都在同一個運行。我一直在尋找的是如果你想能夠單獨運行它們的簡單方法。在上述教程中,我發現SequenceFileOutputFormat「將適合讀取的二進制文件寫入後續的MapReduce作業」以及匹配的SequenceFileInputFormat,這使得它非常容易完成。謝謝。 – 2010-05-17 09:14:40

7

實際上有很多方法可以做到這一點。我將專注於兩個。

一個是通過Riffle(http://github.com/cwensel/riffle)一個註釋庫來識別相關事物並以依賴(拓撲)順序「執行」它們。

或者您可以在級聯(http://www.cascading.org/)中使用級聯(和MapReduceFlow)。未來版本將支持Riffle註釋,但現在使用原始MR JobConf作業效果很好。

一個變種就是不用手工管理MR作業,而是使用Cascading API開發應用程序。然後,JobConf和作業鏈通過Cascading planner和Flow類在內部處理。

這樣你就可以專注於自己的問題,而不是管理Hadoop作業的機制等等。甚至可以在頂層(如clojure或jruby)上層疊不同的語言,以進一步簡化開發和應用程序。 http://www.cascading.org/modules.html

17

有很多方法可以做到這一點。

(1)層疊作業

創建針對第一作業的JobConf對象「JOB1」,並設置所有的參數以「輸入」作爲inputdirectory和「溫度」作爲輸出目錄。執行此任務:

JobClient.run(job1). 

緊下方,創建第二個作業的JobConf對象「作業2」,並設置所有的參數與「溫度」爲inputdirectory和「輸出」的輸出目錄。執行此任務:

JobClient.run(job2). 

(2)創建兩個JobConf對象和設置的所有參數在他們就像(1),只是你不使用JobClient.run。

然後創建兩個工作與jobconfs對象作爲參數:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2); 

使用JobControl作業控制對象時,你指定作業依賴關係,然後運行的作業:

JobControl jbcntrl=new JobControl("jbcntrl"); 
jbcntrl.addJob(job1); 
jbcntrl.addJob(job2); 
job2.addDependingJob(job1); 
jbcntrl.run(); 

(3)如果你需要一個像Map + |的結構減少| Map *,您可以使用Hadoop版本0.19及之後的ChainMapper和ChainReducer類。

乾杯

1

雖然有基於複雜的服務器Hadoop的工作流引擎如Oozie的,我有一個簡單的Java庫,使多個的Hadoop執行作爲工作流程的工作。定義內部作業依賴關係的作業配置和工作流程在JSON文件中配置。所有內容都可以在外部進行配置,並且不需要對現有地圖縮減實施進行任何更改即可成爲工作流程的一部分。

詳情可以在這裏找到。源代碼和jar在github中可用。

http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

普拉納布·

1

我覺得Oozie的幫助隨之而來的就業機會,直接從以前的任務接收輸入。這可以避免使用jobcontrol執行的I/O操作。

3

我們可以利用Job的waitForCompletion(true)方法來定義作業之間的依賴關係。

在我的場景中,我有3個互相依賴的工作。在驅動程序類中,我使用了下面的代碼,它按預期工作。

public static void main(String[] args) throws Exception { 
     // TODO Auto-generated method stub 

     CCJobExecution ccJobExecution = new CCJobExecution(); 

     Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]); 
     Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]); 
     Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]); 

     System.out.println("****************Started Executing distanceTimeFraudJob ================"); 
     distanceTimeFraudJob.submit(); 
     if(distanceTimeFraudJob.waitForCompletion(true)) 
     { 
      System.out.println("=================Completed DistanceTimeFraudJob================= "); 
      System.out.println("=================Started Executing spendingFraudJob ================"); 
      spendingFraudJob.submit(); 
      if(spendingFraudJob.waitForCompletion(true)) 
      { 
       System.out.println("=================Completed spendingFraudJob================= "); 
       System.out.println("=================Started locationFraudJob================= "); 
       locationFraudJob.submit(); 
       if(locationFraudJob.waitForCompletion(true)) 
       { 
        System.out.println("=================Completed locationFraudJob================="); 
       } 
      } 
     } 
    } 
+0

你的答案是關於如何在執行方面加入這些工作。最初的問題是關於最好的數據結構。所以你的答案與這個具體問題無關。 – 2013-01-28 21:03:45

1

如果您想以編程方式鏈接您的作業,您將無法使用JobControl。用法很簡單:

JobControl jobControl = new JobControl(name); 

然後添加ControlledJob實例。 ControlledJob使用它的依賴性定義作業,從而自動插入輸入和輸出以適應作業的「鏈」。

jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2)); 

    jobControl.run(); 

啓動鏈。你會想要把它放在一個敏捷的線程中。這允許檢查鏈條的狀況得到控制而它運行:

while (!jobControl.allFinished()) { 
     System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size()); 
     System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size()); 
     System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size()); 
     List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList(); 
     System.out.println("Jobs in success state: " + successfulJobList.size()); 
     List<ControlledJob> failedJobList = jobControl.getFailedJobList(); 
     System.out.println("Jobs in failed state: " + failedJobList.size()); 
    } 
5

我已經做的工作​​與使用對象JobConf一前一後鏈接。我以WordCount爲例鏈接工作。一份工作計算出在給定輸出中一個詞重複了多少次。第二份工作將第一份工作輸出作爲輸入,並計算出給定輸入中的總字數。以下是需要放置在Driver類中的代碼。

//First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class); 
    job1.setJobName("WordCount"); 

    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(IntWritable.class); 

    job1.setMapperClass(WordCountMapper.class); 
    job1.setCombinerClass(WordCountReducer.class); 
    job1.setReducerClass(WordCountReducer.class); 

    job1.setInputFormat(TextInputFormat.class); 
    job1.setOutputFormat(TextOutputFormat.class); 

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files 
    FileInputFormat.setInputPaths(job1, new Path("input_data")); 

    //"first_job_output" contains data that how many times a word occurred in the given file 
    //This will be the input to the second job. For second job, input data name should be 
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output")); 

    JobClient.runJob(job1); 


    //Second Job - Counts total number of words in a given file 

    JobConf job2 = new JobConf(TotalWords.class); 
    job2.setJobName("TotalWords"); 

    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(IntWritable.class); 

    job2.setMapperClass(TotalWordsMapper.class); 
    job2.setCombinerClass(TotalWordsReducer.class); 
    job2.setReducerClass(TotalWordsReducer.class); 

    job2.setInputFormat(TextInputFormat.class); 
    job2.setOutputFormat(TextOutputFormat.class); 

    //Path name for this job should match first job's output path name 
    FileInputFormat.setInputPaths(job2, new Path("first_job_output")); 

    //This will contain the final output. If you want to send this jobs output 
    //as input to third job, then third jobs input path name should be "second_job_output" 
    //In this way, jobs can be chained, sending output one to other as input and get the 
    //final output 
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output")); 

    JobClient.runJob(job2); 

命令來運行這些作業是:

斌/ Hadoop的罐子TotalWords。

我們需要給出命令的最終作業名稱。在上述情況下,它是TotalWords。

4

您可以按照代碼中給出的方式運行MR鏈。

請注意:只有驅動程序代碼已經提供

public class WordCountSorting { 
// here the word keys shall be sorted 
     //let us write the wordcount logic first 

     public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException { 
      //THE DRIVER CODE FOR MR CHAIN 
      Configuration conf1=new Configuration(); 
      Job j1=Job.getInstance(conf1); 
      j1.setJarByClass(WordCountSorting.class); 
      j1.setMapperClass(MyMapper.class); 
      j1.setReducerClass(MyReducer.class); 

      j1.setMapOutputKeyClass(Text.class); 
      j1.setMapOutputValueClass(IntWritable.class); 
      j1.setOutputKeyClass(LongWritable.class); 
      j1.setOutputValueClass(Text.class); 
      Path outputPath=new Path("FirstMapper"); 
      FileInputFormat.addInputPath(j1,new Path(args[0])); 
        FileOutputFormat.setOutputPath(j1,outputPath); 
        outputPath.getFileSystem(conf1).delete(outputPath); 
      j1.waitForCompletion(true); 
        Configuration conf2=new Configuration(); 
        Job j2=Job.getInstance(conf2); 
        j2.setJarByClass(WordCountSorting.class); 
        j2.setMapperClass(MyMapper2.class); 
        j2.setNumReduceTasks(0); 
        j2.setOutputKeyClass(Text.class); 
        j2.setOutputValueClass(IntWritable.class); 
        Path outputPath1=new Path(args[1]); 
        FileInputFormat.addInputPath(j2, outputPath); 
        FileOutputFormat.setOutputPath(j2, outputPath1); 
        outputPath1.getFileSystem(conf2).delete(outputPath1, true); 
        System.exit(j2.waitForCompletion(true)?0:1); 
     } 

} 

序列是

JOB1)MAP-> REDUCE->(JOB2)MAP
這樣做爲了獲得排序的鍵還有更多的方法,如使用樹形圖
但是我想把你的注意力集中到喬布斯鏈接的方式!
謝謝

2

新類org.apache.hadoop.mapreduce.lib.chain.ChainMapper幫助這種情況下

+1

答案是好的 - 但你應該添加一些關於它做些什麼的細節,或者至少是一個API參考鏈接,這樣人們才能投票 – 2016-11-09 17:50:50

+0

ChainMapper和ChainReducer用於在Reduce之前有一個或多個映射器,並且有0個或更多Reduce規範之後的映射器。 (Mapper +)減少(Mapper *)。如果我明顯錯了,請糾正我,但我不認爲這種方法能夠按照OP的要求完成連續的工作。 – rahul1210 2017-04-12 05:53:31

0

正如您在您的要求已提到想要MRJob1的O/P是我/ p MRJob2等等,你可以考慮使用oozie工作流來處理這個用例。您也可以考慮將您的中間數據寫入HDFS,因爲它將被下一個MRJob使用。作業完成後,您可以清理中間數據。

<start to="mr-action1"/> 
<action name="mr-action1"> 
    <!-- action for MRJob1--> 
    <!-- set output path = /tmp/intermediate/mr1--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="mr-action2"> 
    <!-- action for MRJob2--> 
    <!-- set input path = /tmp/intermediate/mr1--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="success"> 
     <!-- action for success--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="fail"> 
     <!-- action for fail--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<end name="end"/> 

相關問題