我正在寫一個MR作業,它將HBase表作爲輸入並轉儲到HDFS文件。我使用MultipleInputs類(從Hadoop),因爲我打算採取多個數據源。我寫了一個非常簡單的MR程序(請參閱下面的源代碼)。不幸的是,我遇到了以下錯誤:使用MultipleInputs的Hbase MapReduce作業:無法將LongWritable轉換爲ImmutableBytesWritable
java.lang.ClassCastException:org.apache.hadoop.io.LongWritable不能轉換到org.apache.hadoop.hbase.io.ImmutableBytesWritable
我運行假分佈式hadoop(1.2.0)和假分佈式HBase(0.95.1-hadoop1)。
這是完整的源代碼:一個有趣的事情是:如果我註釋掉multipleinputs行「MultipleInputs.addInputPath(job,inputPath1,TextInputFormat.class,TableMap.class);」,MR作業運行良好。
public class MixMR {
public static class TableMap extends TableMapper<Text, Text> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "c1".getBytes();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String key = Bytes.toString(row.get());
String val = new String(value.getValue(CF, ATTR1));
context.write(new Text(key), new Text(val));
}
}
public static class Reduce extends Reducer <Object, Text, Object, Text> {
public void reduce(Object key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String ks = key.toString();
for (Text val : values){
context.write(new Text(ks), val);
}
}
}
public static void main(String[] args) throws Exception {
Path inputPath1 = new Path(args[0]);
Path outputPath = new Path(args[1]);
String tableName1 = "test";
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MixMR.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addFamily(Bytes.toBytes("cf"));
TableMapReduceUtil.initTableMapperJob(
tableName1, // input HBase table name
scan, // Scan instance to control CF and attribute selection
TableMap.class, // mapper
Text.class, // mapper output key
Text.class, // mapper output value
job);
job.setReducerClass(Reduce.class); // reducer class
job.setOutputFormatClass(TextOutputFormat.class);
// inputPath1 here has no effect for HBase table
MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, TableMap.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
}