2014-12-05 90 views
3

我試圖在HDFS使用HFileOutputFormat2作爲OUTPUTFORMAT HBase的表從文件上傳數據,但我得到以下異常,ClassCastException異常而使用HFileOutputFormat2

java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell 
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 

Caused by: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell 
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:148) 
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:635) 
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) 
at com.xogito.ingestion.mr.hbase.CSEventsHBaseMapper.map(CSEventsHBaseMapper.java:90) 
at com.xogito.ingestion.mr.hbase.CSEventsHBaseMapper.map(CSEventsHBaseMapper.java:1) 
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) 
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340) 
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) 
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
at java.lang.Thread.run(Thread.java:662) 

下面是該作業

的代碼
@Override 
    public int run(String[] args) throws Exception { 
    Configuration conf = getConf(); 
    job = Job.getInstance(conf, "MY_JOB"); 
    job.setJarByClass(getClass()); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapOutputValueClass(Put.class); 
    job.setSpeculativeExecution(false); 
    job.setReduceSpeculativeExecution(false); 
    job.setMapperClass(CustomMapper.class);//Custom Mapper 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(HFileOutputFormat.class); 


    String parentInputPath = args[0]; 
    String parentOutputPath = args[1]; 

    FileInputFormat.addInputPaths(job, inputPath); 
    HFileOutputFormat.setOutputPath(job,new Path(parentOutputPath)); 
    Configuration hConf = HBaseConfiguration.create(conf); 
    hConf.set("hbase.zookeeper.quorum", "x.x.x.x"); 
    hConf.set("hbase.zookeeper.property.clientPort", "2181"); 
    HTable hTable = new HTable(hConf, "mytable"); 
    // hTable.setAutoFlush(false, true); 
    // hTable.setWriteBufferSize(1024 * 1024 * 12); 
    HFileOutputFormat.configureIncrementalLoad(job, hTable); 
    job.setNumReduceTasks(0); 
    job.submit(); 
    } 

和映射器的代碼下面,

@Override 
public void map(WritableComparable key, Writable val, Context context) throws IOException, InterruptedException{ 
    String data = val.toString(); 
    String[] splitted = data.split("\t"); 
    String account = splitted[1]; 
    Matcher match = ACCOUNT.matcher(account); 
    int clientid = 0; 
    if (match.find()) { 
     clientid = Integer.valueOf(Integer.parseInt(match 
       .group(1))); 
    } 
    String userid = splitted[2]; 
    Long timestamp = 0L; 
    try { 
     timestamp = Long.valueOf(splitted[10]); 
    } catch (Exception e) { 
     LOGGER.error(e.getMessage(), e); 
    } 

    String rowKeyText = "somtext"; 
    ImmutableBytesWritable rowKey = new  
     ImmutableBytesWritable(Bytes.toBytes(rowKeyText)); 

    Put put = new Put(Bytes.toBytes(rowKeyText)); 
    put.add(cf,column, value); 
    context.write(rowKey, put); 
    } 
+1

是你能解決這個問題? – Joel 2015-07-30 18:18:44

回答

0

HFileOutputFormat或新版本HFileOutputFormat2需要KeyValue作爲最終課程。可能PutSortReducer未正確應用,將Put轉換爲KeyValue實例。

在我來說,我沒有使用MapReduce的,但火花,所以我只是創造KeyValue的,而不是直接Put的的