2013-09-22 53 views
0

我剛開始學習Hadoop,並從書中提取了一個示例。所以我創建了一個MapReducer來本地運行,從NCDC免費數據文件中提取溫度。這是數據的一個樣本:Hadoop:在本地模式下運行的奇怪NullPointer異常

0143023780999992012010100004+61450+017167FM-12+002799999V0209999C...cut...; 

的每一個文件(我下載了約100文件)是由許多線像組成。

我的映射器執行簡單的解析操作來從這些文件中提取溫度。整個過程將返回最高溫度。

映射器和相對測試:

public class MaxTemperatureMapper extends Mapper<LongWritable,Text,Text,IntWritable> { 

@Override 
public void map(LongWritable key, Text value, Context context) { 
    String record = value.toString(); 
    String year = record.substring(15,19); 
    int airTemperature = extractTemp(record); 
    if (isNotValidTemp(record, airTemperature)) return; 
    try { 
     context.write(new Text(year), new IntWritable(airTemperature)); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

private boolean isNotValidTemp(String record, int airTemperature) { 
    return airTemperature == 9999 || !record.substring(92, 93).matches("[01459]"); 
} 

private int extractTemp(String record) { 
    String temp = (record.charAt(87) == '+') 
      ? record.substring(88,92) 
      : record.substring(87,92); 
    return Integer.parseInt(temp); 
} 

} 

public class MaxTemperatureMapperTest { 

@Test 
public void processRecord() { 
    Text value = new Text("0111011120999992012010100004+65450+012217FM-12+000999999V0201301N014019999999N9999999N1+00031-00791099271ADDMA1999999099171MD1810341+9999REMSYN070AAXX 01001 01112 46/// /1314 10003 21079 39917 49927 58034 333 91124;"); 

    new MapDriver<LongWritable, Text, Text, IntWritable>() 
      .withMapper(new MaxTemperatureMapper()) 
      .withInputValue(value) 
      .withOutput(new Text("2012"), new IntWritable(3)) 
      .runTest(); 
} 

@Test 
public void processRecordsFromSuspiciousFile() throws IOException { 
    final InputStream is = getClass().getClassLoader().getSystemResource("023780-99999-2012").openStream(); 
    BufferedReader br = new BufferedReader(new InputStreamReader(is)); 
    String line; 
    Iterator<Integer> ii = Arrays.asList(-114, -120, -65, -45, 1, 4, 6, 6, 10, 16, 18, 29, 32, 17, 7, 16, 22, 8, 8, 20).iterator(); 
    while ((line = br.readLine()) != null) { 
     new MapDriver<LongWritable, Text, Text, IntWritable>() 
       .withMapper(new MaxTemperatureMapper()) 
       .withInputValue(new Text(line)) 
       .withOutput(new Text("2012"), new IntWritable(ii.next())) 
       .runTest(); 
    } 
    br.close(); 


} 
} 

減速器和相對測試:

public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable> { 

@Override 
public void reduce(Text key, Iterable<IntWritable> values, Context context) { 
    int maxValue = Integer.MIN_VALUE; 
    for (IntWritable value : values) { 
     maxValue = Math.max(value.get(), maxValue); 
    } 
    try { 
     context.write(key, new IntWritable(maxValue)); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

} 

public class MaxTemperatureReducerTest { 

@Test 
public void processRecord() { 

    new ReduceDriver<Text,IntWritable,Text,IntWritable>() 
      .withReducer(new MaxTemperatureReducer()) 
      .withInputKey(new Text("2012")) 
      .withInputValues(Arrays.asList(new IntWritable(5), new IntWritable(10))) 
      .withOutput(new Text("2012"), new IntWritable(10)) 
      .runTest(); 
} 
} 

最後Driver類+測試:

public class MaxTemperatureDriver extends Configured implements Tool { 

@Override 
public int run(String[] args) throws Exception { 
    if (args.length != 2) { 
     System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); 
     ToolRunner.printGenericCommandUsage(System.err); 
     return -1; 
    } 

    Job job = new Job(getConf(), "Max Temperature"); 
    job.setJarByClass(getClass()); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.setMapperClass(MaxTemperatureMapper.class); 
    job.setCombinerClass(MaxTemperatureReducer.class); 
    job.setReducerClass(MaxTemperatureReducer.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Iterable.class); 

    return job.waitForCompletion(true) ? 0 : 1; 
} 

public static void main(String[] args) throws Exception { 
    int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args); 
    System.exit(exitCode); 
} 
} 

public class MaxTemperatureDriverTest { 

@Test 
public void test() throws Exception { 
    Configuration conf = new Configuration(); 
    conf.set("fs.default.name", "file:///"); 
    conf.set("mapred.job.tracker", "local"); 

    Path input = new Path("file:////home/user/big-data/ncdc/"); 
    Path output = new Path("output"); 

    FileSystem fs = FileSystem.getLocal(conf); 
    fs.delete(output, true); 

    MaxTemperatureDriver driver = new MaxTemperatureDriver(); 
    driver.setConf(conf); 

    int exitCode = driver.run(new String[] { input.toString(), output.toString() }); 
    assertThat(exitCode, is(0)); 
} 
} 

我使用命令行運行的全過程:

$> hadoop doop.MaxTemperatureDriver -fs file:/// -jt local ~/big-data/ncdc/ output 

和MaxTemperatureDriverTest測試,但在這兩種情況下,我得到:

13/09/21 19:45:13 INFO mapred.MapTask: Processing split: file:/home/user/big-data/ncdc/023780-99999-2012:0+5337 
13/09/21 19:45:13 INFO mapred.MapTask: io.sort.mb = 100 
13/09/21 19:45:14 INFO mapred.MapTask: data buffer = 79691776/99614720 
13/09/21 19:45:14 INFO mapred.MapTask: record buffer = 262144/327680 
13/09/21 19:45:14 INFO mapred.LocalJobRunner: Map task executor complete. 
13/09/21 19:45:14 WARN mapred.LocalJobRunner: job_local462595973_0001 
java.lang.Exception: java.lang.NullPointerException 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354) 
Caused by: java.lang.NullPointerException 
    at org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:970) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) 
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

在它總是返回一個空指針異常時,試圖解析該文件「一「太普通」的方式023780-99999- 2012" 。所以我爲它寫了一個測試(你可以在mapper測試中看到「processRecordsFromSuspiciousFile」),但它不會返回錯誤。我也通過日誌檢查沒有任何成功。

這是什麼與錯誤或缺少本地模式參數(線程數,堆內存等)?或代碼中有什麼錯誤?

回答

1

Hadoop不知道如何序列化Iterable開箱即用。如果您確實打算將Iterable用作輸出值類,則還需要指定Iterable的序列化程序。 Hadoop使用的典型I/O類型是Writable的子類。

更新:我現在看到你打算使用IntWritable作爲你的輸出值類。你的問題是這樣的車手陣容:

job.setOutputValueClass(Iterable.class) 

應該

job.setOutputValueClass(IntWritable.class)  
+0

謝謝!是的,我沒有看到它,無法在日誌和測試中發現:) – Randomize