因爲現在我在Hadoop和MapReduce更好的大本營,這裏是我的預期:
啓動羣集,代碼將繼續在這個問題或多或少相同的,但我們可以增加配置參數:
ruby elastic-mapreduce --create --alive --plain-output --master-instance-type m1.xlarge --slave-instance-type m1.xlarge --num-instances 11 --name "Java Pipeline" --bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop --args "--mapred-config-file, s3://com.versata.emr/conf/mapred-site-tuned.xml"
要添加作業步驟:
第1步:
ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-one.jar --arg s3://somepath/input-one --arg s3://somepath/output-one --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0
第二步:
ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-two.jar --arg s3://somepath/output-one --arg s3://somepath/output-two --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0
現在作爲Java代碼,將會有這將包含下列每個類別中的一個實現一個主類:
- org.apache.hadoop。 mapreduce.Mapper;
- org.apache.hadoop.mapreduce.Reducer;
這些都必須覆蓋方法map()和reduce()來完成所需的工作。
所討論問題的Java類看起來像下面:
public class SomeJob extends Configured implements Tool {
private static final String JOB_NAME = "My Job";
/**
* This is Mapper.
*/
public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// Get the cached file
Path file = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];
File fileObject = new File (file.toString());
// Do whatever required with file data
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
outputKey.set("Some key calculated or derived");
outputVey.set("Some Value calculated or derived");
context.write(outputKey, outputValue);
}
}
/**
* This is Reducer.
*/
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
outputKey.set("Some key calculated or derived");
outputVey.set("Some Value calculated or derived");
context.write(outputKey, outputValue);
}
}
@Override
public int run(String[] args) throws Exception {
try {
Configuration conf = getConf();
DistributedCache.addCacheFile(new URI(args[2]), conf);
Job job = new Job(conf);
job.setJarByClass(TaxonomyOverviewReportingStepOne.class);
job.setJobName(JOB_NAME);
job.setMapperClass(MapJob.class);
job.setReducerClass(ReduceJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
} catch (Exception e) {
e.printStackTrace();
return 1;
}
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.out
.println("Usage: SomeJob <comma sparated list of input directories> <output dir> <cache file>");
System.exit(-1);
}
int result = ToolRunner.run(new TaxonomyOverviewReportingStepOne(), args);
System.exit(result);
}
}
你當然可以使用,即使你使用的是Java流。產生羣集後,你有jobflo_id,添加2個流程步驟,如下所示: 'ruby elastic-mapreduce --jobflow --jar s3://somepath/job-one.jar --arg s3:// somepath/input-one --arg s3:// somepath/output-one --args -m,mapred.min.split.size = 52880 -m,mapred.task.timeout = 0'和 'ruby elastic-mapreduce - jobflow --jar s3://somepath/job-one.jar --arg s3:// somepath/input-one --arg s3:// somepath/output-one --args -m,mapred.min。 split.size = 52880 -m,mapred.task.timeout = 0 ' –
Amar
2012-11-06 22:11:42