2015-06-17 26 views
1

我在Hadoop中的MapReduce使用工作在一個非常簡單的圖形分析工具。我有一個圖表,如下所示(每一行代表和邊緣 - 事實上,這是一個三角形圖表):輸入是不是我所期望Hadoop中(Java)的

1 3 
3 1 
3 2 
2 3 

現在,我想用的MapReduce計算三角形在該圖中(顯然一個)。據工作仍然在進行中,並在第一階段,我試圖讓所有的鄰居爲每個頂點的列表。

我的主類如下所示:

public class TriangleCount { 

    public static void main(String[] args) throws Exception { 

     // remove the old output directory 
     FileSystem fs = FileSystem.get(new Configuration()); 
     fs.delete(new Path("output/"), true); 

     JobConf firstPhaseJob = new JobConf(FirstPhase.class); 

     firstPhaseJob.setOutputKeyClass(IntWritable.class); 
     firstPhaseJob.setOutputValueClass(IntWritable.class); 

     firstPhaseJob.setMapperClass(FirstPhase.Map.class); 
     firstPhaseJob.setCombinerClass(FirstPhase.Reduce.class); 
     firstPhaseJob.setReducerClass(FirstPhase.Reduce.class); 

     FileInputFormat.setInputPaths(firstPhaseJob, new Path("input/")); 
     FileOutputFormat.setOutputPath(firstPhaseJob, new Path("output/")); 

     JobClient.runJob(firstPhaseJob); 
    } 
} 

我的映射器和減速器實現這個樣子,他們都非常簡單:

public class FirstPhase { 

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> { 

     @Override 
     public void map(LongWritable longWritable, Text graphLine, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException { 
      StringTokenizer tokenizer = new StringTokenizer(graphLine.toString()); 
      int n1 = Integer.parseInt(tokenizer.nextToken()); 
      int n2 = Integer.parseInt(tokenizer.nextToken()); 
      if(n1 > n2) { 
       System.out.println("emitting (" + new IntWritable(n1) + ", " + new IntWritable(n2) + ")"); 
       outputCollector.collect(new IntWritable(n1), new IntWritable(n2)); 
      } 
     } 
    } 

    public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, Text> { 

     @Override 
     public void reduce(IntWritable key, Iterator<IntWritable> iterator, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException { 
      List<IntWritable> nNodes = new ArrayList<>(); 
      while(iterator.hasNext()) { 
       nNodes.add(iterator.next()); 
      } 

      System.out.println("key: " + key + ", list: " + nNodes); 

      // create pairs and emit these 
      for(IntWritable n1 : nNodes) { 
       for(IntWritable n2 : nNodes) { 
        outputCollector.collect(key, new Text(n1.toString() + " " + n2.toString())); 
       } 
      } 
     } 
    } 
} 

我已經添加了一些記錄到程序。在地圖階段,我打印出我正在發射的那一對。在縮小階段,我打印縮小的輸入。我得到以下輸出:

emitting (3, 1) 
emitting (3, 2) 
key: 3, list: [1, 1] 

reduce函數的輸入不是我所期望的。我期望它是[1,2]而不是[1,1]。我相信Hadoop會自動將來自地圖階段輸出的所有發射對組合起來,但我在這裏錯過了什麼嗎?任何幫助或解釋將不勝感激。

回答

2

這是用Hadoop MapReduce的人開始一個典型的問題。

的問題是在你的減速。當通過給定Iterator<IntWritable>循環,每個IntWritable實例重複使用,所以它只能在給定的時間,會讓周圍的一個實例。

這意味着,當你調用iterator.next()您第一次保存IntWritable實例設置新的值。

你可以閱讀更多關於這個問題在這裏
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/

+0

哇這麼簡單的事。感謝您的參考和答案! – Devos50

相關問題