2017-02-09 75 views
1

我是hadoop的新手,在少量基礎地圖縮減程序上進行實驗。我遇到了一些特殊的問題。我觀察到我的映射器輸出通過繞過減速器直接打印到輸出文件。Mapper輸出直接打印到輸出文件

的代碼如下

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.HashMap; 
import java.util.Iterator; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 



class WeatherRecord implements Writable { 
    public DoubleWritable maxSum; // running sum of TMAX records 
    public IntWritable maxCount; // running count of TMAX records 
    public DoubleWritable minSum; // running sum of TMIN records 
    public IntWritable minCount; // running count of TMIN records 

    // default constructor 
    public WeatherRecord(){ 

     maxSum = new DoubleWritable(); 
     maxCount= new IntWritable(); 
     minSum = new DoubleWritable(); 
     minCount = new IntWritable(); 
    } 

    // custom constructor 
    public WeatherRecord(DoubleWritable ms, IntWritable mc, DoubleWritable ms1, IntWritable mc1){ 
     maxSum = ms; 
     maxCount= mc; 
     minSum = ms1; 
     minCount = mc1; 
    } 

    /* Getter and setter Methods*/ 



    //method to get running total of temperature 
    public double getMaxSum(){ 
     return Double.parseDouble(maxSum.toString()); 
    } 

    //method to get running total of temperature 
    public double getMinSum(){ 
     return Double.parseDouble(minSum.toString()); 
    } 

    //method to get Count 
    public int getMaxCount(){ return Integer.parseInt(maxCount.toString());} 

    //method to get Count 
    public int getMinCount(){ return Integer.parseInt(minCount.toString());} 

    // method to set count 
    public void setMaxCount(int c){ 
     maxCount = new IntWritable(c); 
    } 

    // method to set count 
    public void setMinCount(int c){ 
     minCount = new IntWritable(c); 
    } 

    //method to set reading sum 
    public void setMaxSum(double r){ 
     maxSum = new DoubleWritable(r); 
    } 

    //method to set reading sum 
    public void setMinSum(double r){ 
     minSum = new DoubleWritable(r); 
    } 

    // method to serialize object 
    public void write(DataOutput dataOutput) throws IOException { 
     maxSum.write(dataOutput); 
     maxCount.write(dataOutput); 
     minSum.write(dataOutput); 
     minCount.write(dataOutput); 
    } 

    //method to deserialize object 
    public void readFields(DataInput dataInput) throws IOException { 
     maxSum.readFields(dataInput); 
     maxCount.readFields(dataInput); 
     minSum.readFields(dataInput); 
     minCount.readFields(dataInput); 
    } 
} 


public class WeatherDriver extends Configured implements Tool{ 

    public static class WeatherMap extends Mapper<LongWritable, Text, Text,WeatherRecord > { 

     HashMap<String,WeatherRecord> recordMap= new HashMap<String,WeatherRecord>(); 

     protected void map(LongWritable key, Text value, Mapper.Context context) { 
      //the individual records from csv file is split based on ',' 
      String[] record = value.toString().split(","); 

      //station-id is the first field in the file 
      String stationId = record[0]; 

      //record-type(TMAX,TMIN,..) is the third field in the csv file 
      String type = record[2]; 

      //temperature readings are fourth column in the csv file 
      double temperature = Double.parseDouble(record[3]); 


      if(type.equalsIgnoreCase("TMAX") || type.equalsIgnoreCase("TMIN")){ 

       if(recordMap.containsKey(stationId)){ 
        WeatherRecord w = recordMap.get(stationId); 
        if(type.equalsIgnoreCase("TMAX")){ 
         w.setMaxCount(1 + w.getMaxCount()); 
         w.setMaxSum(w.getMaxSum() + temperature); 
        } 
        else if(type.equalsIgnoreCase("TMIN")){ 
         w.setMinCount(1+w.getMinCount()); 
         w.setMinSum(w.getMinSum() + temperature); 
        } 
        recordMap.put(stationId,w); 
       } 
       else{ 
        if(type.equalsIgnoreCase("TMAX")){ 
         recordMap.put(stationId, new WeatherRecord(new DoubleWritable(temperature), new IntWritable(1), 
           new DoubleWritable(0), new IntWritable(0))); 
        } 
        else if(type.equalsIgnoreCase("TMIN")){ 
         recordMap.put(stationId, new WeatherRecord(new DoubleWritable(0), new IntWritable(0), 
           new DoubleWritable(temperature), new IntWritable(1))); 
        } 

       } 
      } 

     } // end of map method 

     protected void cleanup(Context context) throws IOException, InterruptedException { 
      Iterator i = recordMap.keySet().iterator(); 
      String stationId=""; 
      while(i.hasNext()){ 
       stationId = i.next().toString(); 

       context.write(new Text(stationId),recordMap.get(stationId)); 
      } 
     } // end of cleanup 
    } // end of mapper class 


    public static class WeatherReduce extends Reducer<Text, WeatherRecord, Text, Text> { 

     protected void reduce(Text key, Iterator<WeatherRecord> values, Reducer<Text, WeatherRecord, Text, Text>.Context context) throws IOException, InterruptedException { 
      // initializing local variables to compute average 
      int maxCount =0; 
      int minCount=0; 
      double maxSum=0; 
      double minSum=0; 



      //iterating over list of values to compute average 
      while(values.hasNext()){ 
       WeatherRecord record = values.next(); 

       maxSum += Double.parseDouble(record.maxSum.toString()); 
       maxCount += Integer.parseInt(record.maxCount.toString()); 
       minSum += Double.parseDouble(record.minSum.toString()); 
       minCount+=Integer.parseInt(record.minCount.toString()); 

      } 

      // logic to handle divide by zero case 

      if(minCount==0){ 
       minCount=1; 
      } 
      if(maxCount==0){ 
       maxCount=1; 
      } 

      System.out.println("Min Sum is" + minSum); 

      context.write(new Text(key), new Text(","+(minSum/minCount)+","+(maxSum/maxCount))); 


     } 
    } 

    @Override 
    public int run(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     args = new GenericOptionsParser(conf, args).getRemainingArgs(); 
     String input = args[0]; 
     String output = args[1]; 

     Job job = new Job(conf, "weather average"); 
     job.setJarByClass(WeatherMap.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setMapperClass(WeatherMap.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(WeatherRecord.class); 

     job.setReducerClass(WeatherReduce.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     FileInputFormat.setInputPaths(job, new Path(input)); 
     Path outPath = new Path(output); 
     FileOutputFormat.setOutputPath(job, outPath); 
     outPath.getFileSystem(conf).delete(outPath, true); 

     job.waitForCompletion(true); 
     return (job.waitForCompletion(true) ? 0 : 1); 
    } 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new WeatherDriver(), args); 
     System.exit(exitCode); 
    } 
} 

預期產出將是 station_id,average_min_temp,average_max_temp

AGE00135039,123.12,11 

但是,相反,我得到這個output.By分析代碼,我發現了來自mapper的context.write直接寫入輸出文件

AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00135039 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 
AGE00147704 [email protected] 

回答

1

由於您的reduce()方法的簽名不正確,您的工作可能會調用Reducer類中的reduce()方法。您有:

protected void reduce(Text key, Iterator<WeatherRecord> values, Context context)

它應該看起來像:

protected void reduce(Text key, Iterable<WeatherRecord> values, Context context)

注意的變化,從IteratorIterable

嘗試和避免這種情況的一種方法是將@Override註釋添加到您認爲應該覆蓋基本實現的方法,如果不是,則會引發編譯時錯誤。

+0

工作。謝謝你的幫助:-) –