您可以使用ChainMapper
和ChainReducer
管道映射器和減縮你想要的方式。請看看here
下會出現類似的代碼段,你需要實現
JobConf mapBConf = new JobConf(false);
JobConf reduceConf = new JobConf(false);
ChainMapper.addMapper(conf, FirstMapper.class, FirstMapperInputKey.class, FirstMapperInputValue.class,
FirstMapperOutputKey.class, FirstMapperOutputValue.class, false, mapBConf);
ChainReducer.setReducer(conf, FirstReducer.class, FirstMapperOutputKey.class, FirstMapperOutputValue.class,
FirstReducerOutputKey.class, FirstReducerOutputValue.class, true, reduceConf);
ChainReducer.addMapper(conf, SecondMapper.class, FirstReducerOutputKey.class, FirstReducerOutputValue.class,
SecondMapperOutputKey.class, SecondMapperOutputValue.class, false, null);
ChainReducer.setReducer(conf, SecondReducer.class, SecondMapperOutputKey.class, SecondMapperOutputValue.class, SecondReducerOutputKey.class, SecondReducerOutputValue.class, true, reduceConf);
,或者如果你不想使用多個映射器和減壓器,你可以做以下
public static class ProductIndexerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
private static Text productId = new Text();
private static LongWritable salesCount = new LongWritable();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
String[] values = value.toString().split("\t");
productId.set(values[0]);
salesCount.set(Long.parseLong(values[1]));
output.collect(productId, salesCount);
}
}
public static class ProductIndexerReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {
private static LongWritable productWritable = new LongWritable();
@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
List<LongWritable> items = new ArrayList<LongWritable>();
long total = 0;
LongWritable item = null;
while(values.hasNext()) {
item = values.next();
total += item.get();
items.add(item);
}
Iterator<LongWritable> newValues = items.iterator();
while(newValues.hasNext()) {
productWritable.set(newValues.next().get()/total);
output.collect(key, productWritable);
}
}
}
`
由於@shazin提到的作業鏈不是這種情況下需要的。對於許多這樣的算法,請檢查[使用MapReduce的數據密集型文本處理 ](http://lintool.github.com/MapReduceAlgorithms/index.html)。這將有助於以MapReduce的方式進行思考。 –
Vinu,你能寫一個例子嗎? 假設有輸入文件作爲 1 ---> 23
1 ---> 33
1 ---> 22
1 ---> 2
2 --- > 3
2 ---> 4
2 ---> 5
你期望的輸出? (23 + 33 + 22 + 2)/(23 + 33 + 22 + 2 + 3 + 4 + 5)
2 --->(3 + 4 + 5)/ + 33 + 22 + 2 + 3 + 4 + 5)
這是你的問題? –