2017-06-15 41 views
0

問題陳述的最高溫度:查找使用MapReduce的的MapReduce代碼找到城市

每個城市的最高溫度

輸入:

Kolkata,56 
Jaipur,45 
Delhi,43 
Mumbai,34 
Goa,45 
Kolkata,35 
Jaipur,34 
Delhi,32 

輸出:

Kolkata 56 
Jaipur 45 
Delhi  43 
Mumbai 34 

我寫的以下代碼:

地圖:

import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

public class Map 
     extends Mapper<LongWritable, Text, Text, IntWritable>{ 

    private IntWritable max = new IntWritable(); 
    private Text word = new Text(); 

    @Override 
    protected void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 

     StringTokenizer line = new StringTokenizer(value.toString(),",\t"); 

     word.set(line.nextToken()); 
     max.set(Integer.parseInt(line.nextToken())); 

     context.write(word,max);    

     } 
    } 

減少:

import java.io.IOException; 
import java.util.Iterator; 

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

public class Reduce 
     extends Reducer<Text, IntWritable, Text, IntWritable>{ 

    private int max_temp = Integer.MIN_VALUE; 
    private int temp = 0; 

    @Override 
    protected void reduce(Text key, Iterable<IntWritable> values, 
      Context context) 
      throws IOException, InterruptedException { 

     Iterator<IntWritable> itr = values.iterator(); 

     while (itr.hasNext()) { 

      temp = itr.next().get(); 
      if(temp > max_temp) 
      { 
       max_temp = temp; 
      } 
     } 

     context.write(key, new IntWritable(max_temp)); 
    } 
} 

驅動程序類:

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class MaxTempDriver { 
    public static void main(String[] args) throws Exception {   

     // Create a new job 
     Job job = new Job(); 

     // Set job name to locate it in the distributed environment 
     job.setJarByClass(MaxTempDriver.class); 
     job.setJobName("Max Temperature"); 

     // Set input and output Path, note that we use the default input format 
     // which is TextInputFormat (each record is a line of input) 
     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     // Set Mapper and Reducer class 
     job.setMapperClass(Map.class); 
     job.setCombinerClass(Reduce.class); 
     job.setReducerClass(Reduce.class); 

     // Set Output key and value 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(IntWritable.class); 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

我收到以下錯誤:

17/06/15 10:44:17 INFO mapred.JobClient: Task Id : 
attempt_201706151011_0002_m_000000_1, Status : FAILED 
java.util.NoSuchElementException 
at java.util.StringTokenizer.nextToken(StringTokenizer.java:349) 
at Map.map(Map.java:23) 
at Map.map(Map.java:1) 
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) 
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) 
at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:422) 
atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 
at org.apache.hadoop.mapred.Child.main(Child.java:249) 

正如你所看到的,我得到的java.util映射函數中的.NoSuchElementException。請幫助我解決這個例外問題,並提供建議來修改map()代碼。

回答

1

檢查是否下一個標記存在:

@Override 
protected void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException { 

    StringTokenizer line = new StringTokenizer(value.toString(), ",\t"); 

    if (line.countTokens() > 0) { 
     word.set(line.nextToken()); 

     if (line.hasMoreTokens()) 
      max.set(Integer.parseInt(line.nextToken())); 

     context.write(word, max); 
    } 
} 
+0

尚未使用'而itr.hasNext()'的OP? – philantrovert

+0

是的,OP在reducer中使用'while(itr.hasNext())',但異常正在映射器中拋出,OP不檢查下一個令牌是否可用。 – chunjef

+0

哦,很酷,謝謝。 – philantrovert