2014-07-16 35 views
0

我寫了一個MapReduce程序來解析CSV的值。MapReduce - 減速器給出錯誤的輸出日期

的數據集是如下 -

PRAVEEN,40020,嬰兒,026A2,12/04/2015

PRAVEEN,40020,玩具,0383,1/04/2014

PRAVEEN ,2727272,BOOK,03383,03 /二千零十三分之一十四

PRAVEEN,22636,BIKE,7373737,12 /二千○十二分之二十四

我的地圖功能從CSV讀取第一值(即用戶名)作爲KEY和最後的值即日期作爲VALUE

我Reduce函數也很簡單,我有選擇的最後日期從值列表值的一個特定的鍵就是用戶名

的代碼如下 -

package com.test.mapreduce; 
    import java.io.IOException; 
    import java.text.ParseException; 
    import java.text.SimpleDateFormat; 
    import java.util.ArrayList; 
    import java.util.Date; 
    import java.util.HashSet; 
    import java.util.Iterator; 
    import java.util.List; 
    import java.util.Set; 

    import org.apache.hadoop.conf.Configuration; 
    import org.apache.hadoop.conf.Configured; 
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.IntWritable; 
    import org.apache.hadoop.io.LongWritable; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapred.FileInputFormat; 
    import org.apache.hadoop.mapred.FileOutputFormat; 
    import org.apache.hadoop.mapred.JobClient; 
    import org.apache.hadoop.mapred.JobConf; 
    import org.apache.hadoop.mapred.KeyValueTextInputFormat; 
    import org.apache.hadoop.mapred.MapReduceBase; 
    import org.apache.hadoop.mapred.Mapper; 
    import org.apache.hadoop.mapred.OutputCollector; 
    import org.apache.hadoop.mapred.Reducer; 
    import org.apache.hadoop.mapred.Reporter; 
    import org.apache.hadoop.mapred.TextInputFormat; 
    import org.apache.hadoop.mapred.TextOutputFormat; 
    import org.apache.hadoop.util.Tool; 
    import org.apache.hadoop.util.ToolRunner; 





public class RetailCustomerAnalysis_2 extends Configured implements Tool { 
      public static class MapClass extends MapReduceBase 
      implements Mapper<LongWritable, Text, Text, Text> { 

     private Text key1 = new Text(); 
     private Text value1 = new Text(); 
     private int noofFields = 5; 



public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, 
       Reporter reporter) throws IOException { 

     String line = value.toString().replaceAll("\\s+",""); 
     String[] split = line.split(","); 


     if(split.length!=noofFields){ 
     return; 
     } 

     else { 
      key1.set(split[0].toString().trim()); 
      value1.set(split[4].toString().trim()); 
      System.out.println(split[4].toString().trim()); 
      output.collect(key1, value1); 
    } 
    } 
    } 

public static class Reduce extends MapReduceBase 
implements Reducer<Text, Text, Text, Text> { 

public void reduce(Text key, Iterator<Text> values, 
        OutputCollector<Text, Text> output, 
        Reporter reporter) throws IOException { 

    SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy"); 
    Date date = new Date(); 

    List<Text> dateList = new ArrayList<Text>(); 

    for(Iterator<Text> it = values; it.hasNext();) { 
     // add the values in the arrayList 
     dateList.add((Text) it.next()); 
    } 


    if(dateList.size()==1){ //If the mapper output has only one date , then select that date 
          // as the VALUE 
    try { 
      date = formatter.parse(dateList.get(0).toString()); 
      } catch (ParseException e) { 
      e.printStackTrace(); 
     } 
    } //If part ends 

    else { 
      try { 
       date = formatter.parse(dateList.get(0).toString()); 
         //select the first date from list 
      } catch (ParseException e1) { 
       e1.printStackTrace(); 
      } 

      for(int i=0 ; i <dateList.size();++i){ 
        try { 
        //compare the selected date with the rest of the dates in the list. 
        if((formatter.parse(dateList.get(i).toString())).compareTo(date)>0){ 
         date=formatter.parse(dateList.get(i).toString()); 
         // getting the max date from the list 
         } 
        } 
        catch (ParseException e) { 
        e.printStackTrace(); 
       } 
      } //for loops ends 
    } // else part ends  

    Text value = new Text(date.toString()); 
     output.collect(key, value); 
     } 
    } 



public int run(String[] args) throws Exception { 
Configuration conf = getConf(); 

JobConf job = new JobConf(conf, RetailCustomerAnalysis_2.class); 

Path in = new Path(args[0]); 
Path out = new Path(args[1]); 
FileInputFormat.setInputPaths(job, in); 
FileOutputFormat.setOutputPath(job, out); 

job.setJobName("RetailCustomerAnalysis_2"); 
job.setMapperClass(MapClass.class); 
job.setReducerClass(Reduce.class); 

job.setInputFormat(TextInputFormat.class); 
job.setOutputFormat(TextOutputFormat.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class); 
job.set("key.value.separator.in.input.line", ","); 

JobClient.runJob(job); 
return 0; 
    } 

public static void main(String[] args) throws Exception { 
int res = ToolRunner.run(new Configuration(), new RetailCustomerAnalysis_2(), args); 

System.exit(res); 
} 

} 

但我從列表中獲得隨機日期作爲結果。任何人都可以幫忙

回答

0

代碼大部分是正確的。減速器的實施必須稍作修改。在下面的代碼剪斷創建問題

for(Iterator<Text> it = values; it.hasNext();) { 
    // add the values in the arrayList 
    dateList.add((Text) it.next()); 
} 

在上面的代碼段中相同的值的對象在每個itreation使用的,只有它們的內容被改變。

例如,假定的MapReduce用以下輸入

PRAVEEN,4002013454,嬰兒運行,026A12,12/04/2015

PRAVEEN,4002013454,玩具,020383,1/04/2014

PRAVEEN,2727272727272,BOOK,03383,03 /二千零十三分之一十四

PRAVEEN,2263637373,BIKE,7373737,12 /二千○十二分之二十四

在減少方法 'dateList' 對象eleme ts在值的for循環完成後具有值(12/24/2012,12/24/2012,12/24/2012,12/24/2012)。這導致其餘代碼的執行不正確,最終輸出錯誤。

相反,你在地圖中更改代碼如下

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { 

    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 

     SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy"); 
     Date date = new Date(); 
     //-----Modified section START----------- 
     List<String> dateList = new ArrayList<String>(); 

     for(Iterator<Text> it = values; it.hasNext();) { 
      // add the values in the arrayList 
      dateList.add(((Text)it.next()).toString()); 
     } 
     //----Modified section END-------------- 
     if(dateList.size()==1){ //If the mapper output has only one date , then select that date 
      // as the VALUE 
      try { 
       date = formatter.parse(dateList.get(0).toString()); 
      } catch (ParseException e) { 
       e.printStackTrace(); 
      } 
     } //If part ends 
     else { 
      String str = dateList.get(0).toString(); 
      try { 

       date = formatter.parse(dateList.get(0).toString()); 
       //select the first date from list 
      } catch (ParseException e1) { 
       e1.printStackTrace(); 
      } 

      for(int i=0 ; i <dateList.size();++i){ 
       try { 
        //compare the selected date with the rest of the dates in the list. 
        if((formatter.parse(dateList.get(i).toString())).compareTo(date)>0){ 
         date=formatter.parse(dateList.get(i).toString()); 
         // getting the max date from the list 
        } 
       } 
       catch (ParseException e) { 
        e.printStackTrace(); 
       } 
      } //for loops ends 
     } // else part ends  

     Text value = new Text(date.toString()); 
     output.collect(key, value); 
    } 
} 

請參閱Hadoop Reducer Values in Memory?有關對象引用的詳細信息,降低方法。

+0

嗨,感謝你了,它解決了我的問題,但仍然明白它是如何工作的,因爲你幾乎沒有改變代碼片段 – Rahuul

+0

@Rahul,問題在於對象引用。在向'dateList'中添加'Text'對象的代碼中,所有Text對象都指向堆中的單個內存位置。 (這是因爲MapReduce框架重用了對象並只改變了狀態)。在最後的創作過程中,Text對象值設置爲2012年12月24日。這意味着dateList中的所有元素的值都是2012年12月24日。 – pranabh

+0

感謝您的解釋。 :)但隨着越野車片段,我從列表中獲得第一個值作爲「結果」日期。 – Rahuul