2010-10-01 59 views
4

嗨 Reduce階段中的文本操作似乎無法正常工作。 我懷疑問題可能在我的代碼,而不是hadoop本身,但你永遠不知道... 如果你能發現任何陷阱讓我知道。 我浪費了一天的時間,試圖找出這段代碼有什麼問題。Hadoop減速器字符串操作不起作用

我的樣本輸入文件名爲simple.psv

12345 [email protected]|m|1975 
12346 [email protected]|m|1981 

我的映射器和減速機代碼

package simplemapreduce; 

import java.io.IOException; 
import java.util.Iterator; 
import java.util.StringTokenizer; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 

/** 
* 
* @author 
*/ 
public class Main { 


    public static class SimpleMap extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 

     public void map(LongWritable key, Text inputs, 
       OutputCollector<Text, Text> output, Reporter reporter) 
       throws IOException { 

      String inputString = inputs.toString(); 
      //System.out.println("CRM Map record:"+inputString); 
      StringTokenizer tokenizer = new StringTokenizer(inputString); 
      Text userid = new Text(); 
      if (tokenizer.hasMoreTokens()) { 
       userid.set(tokenizer.nextToken()); 
       Text data = new Text(); 
       if (tokenizer.hasMoreTokens()) { 
        data.set(tokenizer.nextToken()); 
       } else { 
        data.set(""); 
       } 
       output.collect(userid, data); 
      } 
     } 
    } 

    /** 
    * A reducer class that just emits its input. 
    */ 
    public static class SimpleReduce extends MapReduceBase implements 
      Reducer<Text, Text, Text, Text> { 

     public void reduce(Text key, Iterator<Text> values, 
       OutputCollector<Text, Text> output, Reporter reporter) 
       throws IOException { 

      while (values.hasNext()) { 
       Text txt = values.next(); 
       String inputString = "<start>" + txt.toString() + "<end>"; 
       Text out = new Text(); 
       out.set(inputString); 
       //System.out.println(inputString); 
       output.collect(key, out); 

      } 
     } 
    } 

    public static void main(String[] args) throws IOException { 

     if (args.length != 2) { 
      System.err.println("Usage: SimpleMapReduce <input path> <output path>"); 
      System.exit(1); 
     } 
     JobConf conf = new JobConf(Main.class); 
     conf.setJobName("Simple Map reducer"); 

     FileInputFormat.setInputPaths(conf, new Path(args[0])); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
     conf.setMapperClass(SimpleMap.class); 
     conf.setCombinerClass(SimpleReduce.class); 
     conf.setReducerClass(SimpleReduce.class); 
     conf.setOutputKeyClass(Text.class); 
     conf.setOutputValueClass(Text.class); 
     conf.setNumReduceTasks(1); 
     JobClient.runJob(conf); 
    } 
} 

我的樣本啓動腳本調用simple.sh

#!/bin/bash 

hadoop jar SimpleMapReduce.jar \ 
    /full/path/to/input/simple.tsv /user/joebloggs/output 

預期輸出

12345 <start>[email protected]|m|1975<end> 
12346 <start>[email protected]|m|1981<end> 

實際輸出

12345 <start><start>[email protected]|m|1975<end><end> 
12346 <start><start>[email protected]|m|1981<end><end> 

我在Linux上測試 這在Amazon S3,以及如果你能發現這個問題,讓我知道它是什麼......真的會節省一些頭髮在我頭上!

回答

4

數據通過系統的基本流程是:

Input -> Map -> Reduce -> output. 

作爲組合器已被添加到允許計算機(Hadoop集羣在多箇中的一個)的一種性能優化做的局部聚集將數據傳輸到實際減速器運行的系統之前。

在字數例如它是好的開始使用這些值:

1 1 1 1 1 1 1 1 1 1 

它們組合成

3 4 2 1 

和他們減少到最終的結果

10 

所以組合器本質上是一個性能優化。 如果你沒有指定組合器,它不會改變通過的信息(即它是一個「身份縮減器」)。 因此,如果數據集保持有效,那麼只能使用SAME類作爲組合器和簡化器。在你的情況:這不是真的 - >你的數據現在是無效的。

你這樣做:

conf.setCombinerClass(SimpleReduce.class); 
conf.setReducerClass(SimpleReduce.class); 

因此,這使你的映射器的輸出通過你的減速機的兩倍。 第一個加上:「開始」&「結束」 第二個再加上「開始」&「結束」。

簡單的解決方案:

// conf.setCombinerClass(SimpleReduce.class); 
conf.setReducerClass(SimpleReduce.class); 

HTH

2

我有一個問題,其中減速不會得到所有由映射器發送的數據。減速器只能達到特定的部分output.collect會發出。例如。 輸入數據:

12345 [email protected]|m|1975
12346 [email protected]|m|1981

如果我說

輸出.collect(鍵,mail_id);

然後它不會得到接下來的兩個領域 - 性別和出生年份。

// conf.setCombinerClass(SimpleReduce.class);

解決了這個問題。