2011-01-20 61 views
3

我最近開始使用Hadoop,並且在將Mapfile用作MapReduce作業的輸入時遇到了問題。Mapfile作爲MapReduce作業的輸入

以下工作代碼在hdfs中寫入一個名爲「TestMap」的簡單MapFile,其中有三個Text類型的鍵和三個BytesWritable類型的值。

這裏TestMap的內容:

$ hadoop fs -text /user/hadoop/TestMap/data 
11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 
11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor 
A 01 
B 02 
C 03 

這裏是創建TestMap映射文件的程序:

下面嘗試通過一個遞增
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.io.MapFile; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.IOUtils; 

public class CreateMap { 

    public static void main(String[] args) throws IOException{ 

     Configuration conf = new Configuration(); 
     FileSystem hdfs = FileSystem.get(conf); 

     Text key = new Text(); 
     BytesWritable value = new BytesWritable(); 
     byte[] data = {1, 2, 3}; 
     String[] strs = {"A", "B", "C"}; 
     int bytesRead; 
     MapFile.Writer writer = null; 

     writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), value.getClass()); 
     try { 
      for (int i = 0; i < 3; i++) { 
       key.set(strs[i]); 
       value.set(data, i, 1); 
       writer.append(key, value); 
       System.out.println(strs[i] + ":" + data[i] + " added."); 
      } 
     } 
     catch (IOException e) { 
      e.printStackTrace(); 
     } 
     finally { 
      IOUtils.closeStream(writer); 
     } 
    } 
} 

簡單的MapReduce工作映射文件中的值:

import java.io.IOException; 
import java.util.Iterator; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.SequenceFileInputFormat; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.io.BytesWritable; 


public class AddOne extends Configured implements Tool { 

    public static class MapClass extends MapReduceBase 

     implements Mapper<Text, BytesWritable, Text, Text> { 

     public void map(Text key, BytesWritable value, 
         OutputCollector<Text, Text> output, 
         Reporter reporter) throws IOException { 


      byte[] data = value.getBytes(); 
      data[0] += 1; 
      value.set(data, 0, 1); 
      output.collect(key, new Text(value.toString())); 
     } 
    } 

    public static class Reduce extends MapReduceBase 
     implements Reducer<Text, Text, Text, Text> { 

     public void reduce(Text key, Iterator<Text> values, 
          OutputCollector<Text, Text> output, 
          Reporter reporter) throws IOException { 

      output.collect(key, values.next()); 
     } 
    } 

    public int run(String[] args) throws Exception { 
     Configuration conf = getConf(); 

     JobConf job = new JobConf(conf, AddOne.class); 

     Path in = new Path("TestMap"); 
     Path out = new Path("output"); 
     FileInputFormat.setInputPaths(job, in); 
     FileOutputFormat.setOutputPath(job, out); 

     job.setJobName("AddOne"); 
     job.setMapperClass(MapClass.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormat(SequenceFileInputFormat.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 

     job.setOutputFormat(TextOutputFormat.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     job.set("key.value.separator.in.input.line", ":"); 


     JobClient.runJob(job); 

     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), new AddOne(), args); 

     System.exit(res); 
    } 
} 

我得到的運行時異常是:

java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable 
    at AddOne$MapClass.map(AddOne.java:32) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) 
    at org.apache.hadoop.mapred.Child.main(Child.java:170) 

我不明白爲什麼Hadoop是努力蒙上了LongWritable,因爲在我的代碼定義映射程序接口的正確(Mapper<Text, BytesWritable, Text, Text>)。

有人能幫助我嗎?

非常感謝您

盧卡

回答

15

你的問題來自於一個事實是,儘管什麼名字告訴你,一個MapFile文件。

MapFile實際上是一個由兩個文件組成的目錄:有一個「數據」文件,它是一個SequenceFile,其中包含您寫入其中的鍵和值;然而,還有一個「索引」文件,它是一個不同的SequenceFile,其中包含鍵的子序列以及它們作爲LongWritables的偏移量;通過MapFile.Reader將此索引加載到內存中,以便快速進行二分查找,以便在隨機訪問時在數據文件中找到具有所需數據的偏移量。您正在使用舊的"org.apache.hadoop.mapred" version of SequenceFileInputFormat。當你告訴它將MapFile作爲輸入來查看數據文件時,知道它並不夠聰明;相反,它實際上試圖將索引文件的數據文件用作常規輸入文件。數據文件將正常工作,因爲類與您指定的內容一致,但索引文件將拋出ClassCastException,因爲索引文件值都是LongWritables。

您有兩種選擇:您可以開始使用"org.apache.hadoop.mapreduce" version of SequenceFileInputFormat(從而更改代碼的其他部分),它可以充分了解MapFiles以查看數據文件;或者,您可以明確地將數據文件作爲您想要的文件輸入。

0

的方法之一可能是使用自定義InputFormat有一個記錄整個映射文件塊,並查找由密鑰從地圖()