我最近開始使用Hadoop,並且在將Mapfile用作MapReduce作業的輸入時遇到了問題。Mapfile作爲MapReduce作業的輸入
以下工作代碼在hdfs中寫入一個名爲「TestMap」的簡單MapFile,其中有三個Text類型的鍵和三個BytesWritable類型的值。
這裏TestMap的內容:
$ hadoop fs -text /user/hadoop/TestMap/data
11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor
A 01
B 02
C 03
這裏是創建TestMap映射文件的程序:
下面嘗試通過一個遞增import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
public class CreateMap {
public static void main(String[] args) throws IOException{
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
Text key = new Text();
BytesWritable value = new BytesWritable();
byte[] data = {1, 2, 3};
String[] strs = {"A", "B", "C"};
int bytesRead;
MapFile.Writer writer = null;
writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), value.getClass());
try {
for (int i = 0; i < 3; i++) {
key.set(strs[i]);
value.set(data, i, 1);
writer.append(key, value);
System.out.println(strs[i] + ":" + data[i] + " added.");
}
}
catch (IOException e) {
e.printStackTrace();
}
finally {
IOUtils.closeStream(writer);
}
}
}
簡單的MapReduce工作映射文件中的值:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.BytesWritable;
public class AddOne extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, BytesWritable, Text, Text> {
public void map(Text key, BytesWritable value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
byte[] data = value.getBytes();
data[0] += 1;
value.set(data, 0, 1);
output.collect(key, new Text(value.toString()));
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
output.collect(key, values.next());
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, AddOne.class);
Path in = new Path("TestMap");
Path out = new Path("output");
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("AddOne");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ":");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new AddOne(), args);
System.exit(res);
}
}
我得到的運行時異常是:
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable
at AddOne$MapClass.map(AddOne.java:32)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
我不明白爲什麼Hadoop是努力蒙上了LongWritable,因爲在我的代碼定義映射程序接口的正確(Mapper<Text, BytesWritable, Text, Text>
)。
有人能幫助我嗎?
非常感謝您
盧卡