2012-01-30 76 views
0

我目前使用用ruby編寫的mapper和reducer代碼運行流式作業。我想將它們轉換爲java。我不知道如何使用java運行EMR hadoop的流式作業。在Amazon的EMR網站上發佈的cloudburst樣本太複雜了。以下是我目前如何運行這些工作的詳細信息。EMR使用映射器和縮減器的Java代碼進行流式處理作業

代碼開始作業:

 elastic-mapreduce --create --alive --plain-output --master-instance-type m1.small 
--slave-instance-type m1.xlarge --num-instances 2 --name "Job Name" --bootstrap-action 
    s3://bucket-path/bootstrap.sh 

代碼中添加一個步驟:

elastic-mapreduce -j <job_id> --stream --step-name "my_step_name" 
--jobconf mapred.task.timeout=0 --mapper s3://bucket-path/mapper.rb 
--reducer s3://bucket-path/reducerRules.rb --cache s3://bucket-path/cache/cache.txt 
--input s3://bucket-path/input --output s3://bucket-path/output 

映射代碼從上面提到的電子病歷的緩存參數以及它讀取一個CSV文件中讀取從也有一些csv文件的輸入s3桶中,執行一些計算並將csv輸出行打印到標準輸出。

//mapper.rb 
CSV_OPTIONS = { 
    // some CSV options 
} 

begin 
    file = File.open("cache.txt") 
    while (line = file.gets) 
     // do something 
    end 
    file.close 
end 

input = FasterCSV.new(STDIN, CSV_OPTIONS) 
input.each{ 
// do calculations and get result 
puts (result) 
} 

//reducer.rb 

$stdin.each_line do |line| 
// do some aggregations and get aggregation_result 
if(some_condition) puts(aggregation_result) 
end 

回答

0

因爲現在我在Hadoop和MapReduce更好的大本營,這裏是我的預期:

啓動羣集,代碼將繼續在這個問題或多或少相同的,但我們可以增加配置參數:

ruby elastic-mapreduce --create --alive --plain-output --master-instance-type m1.xlarge --slave-instance-type m1.xlarge --num-instances 11 --name "Java Pipeline" --bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop --args "--mapred-config-file, s3://com.versata.emr/conf/mapred-site-tuned.xml" 

要添加作業步驟:

第1步:

ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-one.jar --arg s3://somepath/input-one --arg s3://somepath/output-one --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

第二步:

ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-two.jar --arg s3://somepath/output-one --arg s3://somepath/output-two --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

現在作爲Java代碼,將會有這將包含下列每個類別中的一個實現一個主類:

  • org.apache.hadoop。 mapreduce.Mapper;
  • org.apache.hadoop.mapreduce.Reducer;

這些都必須覆蓋方法map()和reduce()來完成所需的工作。

所討論問題的Java類看起來像下面:

public class SomeJob extends Configured implements Tool { 

    private static final String JOB_NAME = "My Job"; 

    /** 
    * This is Mapper. 
    */ 
    public static class MapJob extends Mapper<LongWritable, Text, Text, Text> { 

     private Text outputKey = new Text(); 
     private Text outputValue = new Text(); 

     @Override 
     protected void setup(Context context) throws IOException, InterruptedException { 

      // Get the cached file 
      Path file = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0]; 

      File fileObject = new File (file.toString()); 
      // Do whatever required with file data 
     } 

     @Override 
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
      outputKey.set("Some key calculated or derived"); 
      outputVey.set("Some Value calculated or derived"); 
      context.write(outputKey, outputValue); 
     } 
     } 

    /** 
    * This is Reducer. 
    */ 
    public static class ReduceJob extends Reducer<Text, Text, Text, Text> { 

    private Text outputKey = new Text(); 
    private Text outputValue = new Text(); 

     @Override 
     protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, 
       InterruptedException { 
      outputKey.set("Some key calculated or derived"); 
      outputVey.set("Some Value calculated or derived"); 
      context.write(outputKey, outputValue); 
     } 
    } 

    @Override 
    public int run(String[] args) throws Exception { 

     try { 
      Configuration conf = getConf(); 
      DistributedCache.addCacheFile(new URI(args[2]), conf); 
      Job job = new Job(conf); 

      job.setJarByClass(TaxonomyOverviewReportingStepOne.class); 
      job.setJobName(JOB_NAME); 

      job.setMapperClass(MapJob.class); 
      job.setReducerClass(ReduceJob.class); 
      job.setOutputKeyClass(Text.class); 
      job.setOutputValueClass(Text.class); 

      job.setInputFormatClass(TextInputFormat.class); 
      job.setOutputFormatClass(TextOutputFormat.class); 
      job.setMapOutputKeyClass(Text.class); 
      job.setMapOutputValueClass(Text.class); 
      FileInputFormat.setInputPaths(job, args[0]); 
      FileOutputFormat.setOutputPath(job, new Path(args[1])); 

      boolean success = job.waitForCompletion(true); 
      return success ? 0 : 1; 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return 1; 
     } 

    } 

    public static void main(String[] args) throws Exception { 

     if (args.length < 3) { 
      System.out 
        .println("Usage: SomeJob <comma sparated list of input directories> <output dir> <cache file>"); 
      System.exit(-1); 
     } 

     int result = ToolRunner.run(new TaxonomyOverviewReportingStepOne(), args); 
     System.exit(result); 
    } 

} 
0

如果您使用java,則不使用流式傳輸。您直接針對MapReduce API構建一個Jar。

退房有關如何做到這一點,包括臭名昭著的單詞計數一些很好的例子Hadoop的源的例子文件夾: https://github.com/apache/hadoop/tree/trunk/src/examples/org/apache/hadoop/examples

我不完全知道爲什麼要使用Java,而是直接編碼到API的可能是痛苦的。你可能想嘗試以下之一: Java項目:

非Java的:

FWIW我覺得豬很可能是我的選擇,並支持了電子病歷的開箱。

+0

你當然可以使用,即使你使用的是Java流。產生羣集後,你有jobflo_id,添加2個流程步驟,如下所示: 'ruby elastic-mapreduce --jobflow --jar s3://somepath/job-one.jar --arg s3:// somepath/input-one --arg s3:// somepath/output-one --args -m,mapred.min.split.size = 52880 -m,mapred.task.timeout = 0'和 'ruby elastic-mapreduce - jobflow --jar s3://somepath/job-one.jar --arg s3:// somepath/input-one --arg s3:// somepath/output-one --args -m,mapred.min。 split.size = 52880 -m,mapred.task.timeout = 0 ' – Amar 2012-11-06 22:11:42

相關問題