我在Hadoop中的MapReduce使用工作在一個非常簡單的圖形分析工具。我有一個圖表,如下所示(每一行代表和邊緣 - 事實上,這是一個三角形圖表):輸入是不是我所期望Hadoop中(Java)的
1 3
3 1
3 2
2 3
現在,我想用的MapReduce計算三角形在該圖中(顯然一個)。據工作仍然在進行中,並在第一階段,我試圖讓所有的鄰居爲每個頂點的列表。
我的主類如下所示:
public class TriangleCount {
public static void main(String[] args) throws Exception {
// remove the old output directory
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path("output/"), true);
JobConf firstPhaseJob = new JobConf(FirstPhase.class);
firstPhaseJob.setOutputKeyClass(IntWritable.class);
firstPhaseJob.setOutputValueClass(IntWritable.class);
firstPhaseJob.setMapperClass(FirstPhase.Map.class);
firstPhaseJob.setCombinerClass(FirstPhase.Reduce.class);
firstPhaseJob.setReducerClass(FirstPhase.Reduce.class);
FileInputFormat.setInputPaths(firstPhaseJob, new Path("input/"));
FileOutputFormat.setOutputPath(firstPhaseJob, new Path("output/"));
JobClient.runJob(firstPhaseJob);
}
}
我的映射器和減速器實現這個樣子,他們都非常簡單:
public class FirstPhase {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> {
@Override
public void map(LongWritable longWritable, Text graphLine, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException {
StringTokenizer tokenizer = new StringTokenizer(graphLine.toString());
int n1 = Integer.parseInt(tokenizer.nextToken());
int n2 = Integer.parseInt(tokenizer.nextToken());
if(n1 > n2) {
System.out.println("emitting (" + new IntWritable(n1) + ", " + new IntWritable(n2) + ")");
outputCollector.collect(new IntWritable(n1), new IntWritable(n2));
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
@Override
public void reduce(IntWritable key, Iterator<IntWritable> iterator, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException {
List<IntWritable> nNodes = new ArrayList<>();
while(iterator.hasNext()) {
nNodes.add(iterator.next());
}
System.out.println("key: " + key + ", list: " + nNodes);
// create pairs and emit these
for(IntWritable n1 : nNodes) {
for(IntWritable n2 : nNodes) {
outputCollector.collect(key, new Text(n1.toString() + " " + n2.toString()));
}
}
}
}
}
我已經添加了一些記錄到程序。在地圖階段,我打印出我正在發射的那一對。在縮小階段,我打印縮小的輸入。我得到以下輸出:
emitting (3, 1)
emitting (3, 2)
key: 3, list: [1, 1]
reduce函數的輸入不是我所期望的。我期望它是[1,2]而不是[1,1]。我相信Hadoop會自動將來自地圖階段輸出的所有發射對組合起來,但我在這裏錯過了什麼嗎?任何幫助或解釋將不勝感激。
哇這麼簡單的事。感謝您的參考和答案! – Devos50