2013-07-11 64 views
0

我正在寫一個MR作業,它將HBase表作爲輸入並轉儲到HDFS文件。我使用MultipleInputs類(從Hadoop),因爲我打算採取多個數據源。我寫了一個非常簡單的MR程序(請參閱下面的源代碼)。不幸的是,我遇到了以下錯誤:使用MultipleInputs的Hbase MapReduce作業:無法將LongWritable轉換爲ImmutableBytesWritable

java.lang.ClassCastException:org.apache.hadoop.io.LongWritable不能轉換到org.apache.hadoop.hbase.io.ImmutableBytesWritable

我運行假分佈式hadoop(1.2.0)和假分佈式HBase(0.95.1-hadoop1)。

這是完整的源代碼:一個有趣的事情是:如果我註釋掉multipleinputs行「MultipleInputs.addInputPath(job,inputPath1,TextInputFormat.class,TableMap.class);」,MR作業運行良好。

public class MixMR { 

public static class TableMap extends TableMapper<Text, Text> { 
    public static final byte[] CF = "cf".getBytes(); 
    public static final byte[] ATTR1 = "c1".getBytes(); 

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { 

     String key = Bytes.toString(row.get()); 
     String val = new String(value.getValue(CF, ATTR1)); 

     context.write(new Text(key), new Text(val)); 
    } 
} 


public static class Reduce extends Reducer <Object, Text, Object, Text> { 
    public void reduce(Object key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     String ks = key.toString(); 
     for (Text val : values){ 
      context.write(new Text(ks), val); 
     } 

    } 
} 

public static void main(String[] args) throws Exception { 
    Path inputPath1 = new Path(args[0]); 
    Path outputPath = new Path(args[1]); 

    String tableName1 = "test"; 

    Configuration config = HBaseConfiguration.create(); 
    Job job = new Job(config, "ExampleRead"); 
    job.setJarByClass(MixMR.class);  // class that contains mapper 


    Scan scan = new Scan(); 
    scan.setCaching(500);  // 1 is the default in Scan, which will be bad for MapReduce jobs 
    scan.setCacheBlocks(false); // don't set to true for MR jobs 
    scan.addFamily(Bytes.toBytes("cf")); 

    TableMapReduceUtil.initTableMapperJob(
      tableName1,  // input HBase table name 
       scan,    // Scan instance to control CF and attribute selection 
       TableMap.class, // mapper 
       Text.class,    // mapper output key 
       Text.class,    // mapper output value 
       job); 
    job.setReducerClass(Reduce.class); // reducer class 
    job.setOutputFormatClass(TextOutputFormat.class); 

    // inputPath1 here has no effect for HBase table 
    MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, TableMap.class); 

    FileOutputFormat.setOutputPath(job, outputPath); 

    job.waitForCompletion(true); 
} 

}

回答

0

我得到了答案: 在下面的語句:更換TextInputFormat.class到TableInputFormat.class

MultipleInputs.addInputPath(工作,inputPath1,TextInputFormat.class,TableMap.class );

相關問題