2013-04-11 52 views
0

我正在執行map reduce中的加入操作。我正在將兩個文件的值用分隔符(逗號)分隔。通過對一個公共實體執行連接操作,我可以從兩個輸入文件中的一個文件中獲得輸出。mapreduce中加入操作的輸出

這裏是地圖減少代碼:

public class EmpMapReduce { 
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text>  
     { 
     public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException 
       { 
     String tokens [] = value.toString().split(","); 
     String empid = tokens[0]; 
     String val = ""; 
     if(tokens.length != 0) 
     { 
      for (int cnt = 1; cnt < tokens.length; cnt++) 
       {  
       val = val + tokens[cnt] + "\t"; 
      } 
     } 

     context.write(new Text(empid), new Text(val)); 

    } 
    } 

    public static class MyReducer extends Reducer<Text, Text, Text, Text> 
     { 
    public void reduce(Text key, Iterable<Text> values, 
      Context context) throws IOException, InterruptedException 
       { 
      String str = ""; 
     for (Text val : values) 
        { 
      str = str + val.toString() + "\t"; 
     } 

     context.write(key, new Text (str)); 

    } 
    } 

    public static void main(String[] args) throws Exception 
     { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args) 
      .getRemainingArgs(); 
    if (otherArgs.length != 3) { 
     System.err.println("Usage: EmpMapReduce <in1> <in2> <out>"); 
     System.exit(2); 
    } 
    Job job = new Job(conf, "EmpMapReduce"); 



    job.setJarByClass(EmpMapReduce.class); 

    job.setMapperClass(TokenizerMapper.class); 
    job.setReducerClass(MyReducer.class); 

    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 

    job.setInputFormatClass(TextInputFormat.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
    } 

這裏有兩個輸入文件內容我用:

100,name100,10 
101,name101,11 
102,name102,12 
103,name103,13 
104,name104,14 
105,name105,15 
106,name106,16 
107,name107,17 

第二個輸入文件:

100,100000 
101,200000 
102,300000 
103,400000 
104,500000 
105,600000 
106,700000 
107,800000 

我得到以下的輸出:

100,name100,10,100000 
101,200000,name101,11 
102,name102,12,300000 
103,400000,name103,13 
104,name104,14,500000 
105,600000,name105,15 
106,name106,16,700000 
107,800000,name107,17 

現在我擔心的是,爲什麼我收到這樣的輸出:

100,name100,10,100000 
101,200000,name101,11 

即在第一行數據首先從一個輸入文件,並比其他複製。但是對於第二排,反之亦然。我無法弄清楚如何在每一行中使數據的順序相同。

的另一個問題是:

有一次,我在所有行中的數據按特定的順序不是我怎麼能執行各種opertaions,如:更換name100 ---> somenewname或在每一行新的逗號分隔值的末尾添加具有該行先前所有值的總和。

回答

0

從兩個映射器輸出到達縮減器的順序未指定。所以你需要一些方法在減速器中識別它們。

一個簡單的解決方案是:

  • 有兩個映射器,一個用於每個輸入
  • 每個映射器輸出的值「[類型]:[的值其餘]」
  • 說你有兩種類型(用戶,交易),現在每個都被識別。
  • 現在在你的減速(抱歉僞代碼):

void reduce(..) { 
    String user = ""; 
    String trans = ""; 

    for(value: values) { 
    (type, payload) = value.split(); 
    if (type == "user") user = payload; 
    if (type == "transaction") transaction = payload; 
    } 

    context.write(user + "\t" + transaction); 
} 
+0

我不知道爲什麼格式是如此搞砸。對不起: -/ – 2013-04-12 01:24:28

+0

在markdown中似乎存在一個錯誤,你需要在下面的代碼塊的列表後面有一個段落才能正確顯示,我現在插入了一個空段落,它看起來不錯。 – jkovacs 2013-04-12 09:27:29

+0

@MatthewRathbone:如果我爲兩個輸入使用兩個映射器,而不是我必須做的那樣'MultipleInputs.addInputPath(job,path,inputFormatClass,mapperClass);'因此,對於這種情況,我必須做一些覆蓋inputFormat類的開銷。這個小問題很複雜。糾正我,如果我想錯了,如果是的話,請發佈一個簡單的工作解決方案。提前致謝。 – user1188611 2013-04-12 14:49:43

0

對於馬修的解決方案,你可能需要把這個在循環等待中設定的所有值爲了得到正確的結果:

if(!user.equals("") && !trans.equals("")){ 
     str = str + user+ "\t" + trans+ "\t"; 
} 
+0

並且此連接不適用於一對多..... – 2013-11-06 20:18:11