2012-11-27 30 views
0

我得到下面的錯誤,而在運行的HBase的MapReduce:傳遞一個刪除或HBase的MapReduce的認沽錯誤

java.io.IOException: Pass a Delete or a Put 
    at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125) 
    at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84) 
    at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:639) 
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) 
    at HBaseImporter$InnerMap.map(HBaseImporter.java:61) 
    at HBaseImporter$InnerMap.map(HBaseImporter.java:1) 
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212) 
12/11/27 16:16:50 INFO mapred.JobClient: map 0% reduce 0% 
12/11/27 16:16:50 INFO mapred.JobClient: Job complete: job_local_0001 
12/11/27 16:16:50 INFO mapred.JobClient: Counters: 0 

代碼:

public class HBaseImporter extends Configured implements Tool {  
    public static class InnerMap extends 
TableMapper<Text, IntWritable> { 
    IntWritable one = new IntWritable(); 

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { 
    String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("line"))); 
    String[] words = val.toString().split(" "); 
     try { 
       for(String word:words) 
      { 
      context.write(new Text(word), one); 
      } 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { 

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
      int i = 0; 
      for (IntWritable val : values) { 
       i += val.get(); 
      } 
      Put put = new Put(Bytes.toBytes(key.toString())); 
      put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i)); 

      context.write(null, put); 
    } 
} 


public int run(String args[]) throws Exception 
{ 
    //Configuration conf = getConf(); 
    Configuration conf = HBaseConfiguration.create(); 
     conf.addResource(new Path("/home/trg/hadoop-1.0.4/conf/core-site.xml")); 
     conf.addResource(new Path("/home/trg/hadoop-1.0.4/conf/hdfs-site.xml")); 


    Job job = new Job(conf,"SM LogAnalyzer MR"); 

    job.setJarByClass(HBaseImporter.class); 
    //FileInputFormat.setInputPaths(job, new Path(args[1])); 
    //FileOutputFormat.setOutputPath(job, new Path("outyy")); 
    //job.setOutputFormatClass(TextOutputFormat.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(IntWritable.class); 

    //job.setMapperClass(InnerMap.class); 
    Scan scan = new Scan(); 
    scan.setCaching(500);  // 1 is the default in Scan, which will be bad for MapReduce jobs 
    scan.setCacheBlocks(false); 
    TableMapReduceUtil.initTableMapperJob(
      "wc_in",  // input table 
      scan,    // Scan instance to control CF and attribute selection 
      InnerMap.class,  // mapper class 
      Text.class,   // mapper output key 
      IntWritable.class, // mapper output value 
      job); 

    TableMapReduceUtil.initTableReducerJob(
      "word_count",  // output table 
      MyTableReducer.class, // reducer class 
      job); 
     job.setNumReduceTasks(1); 

    job.setNumReduceTasks(0); 

    return job.waitForCompletion(true)?0:1; 
} 

public static void main(String[] args) throws Exception { 
    //Configuration conf = new HBaseConfiguration(); 
    //Job job = configureJob(conf, args); 
    //System.exit(job.waitForCompletion(true) ? 0 : 1); 

    String[] inArgs = new String[4]; 
    inArgs[0] = "HBaseImporter"; 
     inArgs[1] = "/user/trg/wc_in"; 
     inArgs[2] = "AppLogMRImport"; 
     inArgs[3] = "MessageDB"; 
     int res = ToolRunner.run(new Configuration(), new HBaseImporter(), inArgs); 
     //int res = ToolRunner.run(new Configuration(), new HBaseImporter(), args); 

    } 
} 

正在設置地圖產值類作爲IntWritable.class ,但是仍然在要求Put對象的mapper中調用TableOutputFormat.write。

回答

1

爲我自己的問題得到答案。我錯誤地將減速機任務設置爲'0'。

job.setNumReduceTasks(0); 

所以Mapper希望Put對象可以直接寫入到Hbase表中。通過上面的一行來解決這個問題。

0

問題已經不是由下面的代碼解決:

job.setNumReduceTasks(0); 

這不僅有利於禁用減少階段和跳過reduce階段,真正的問題。

此外,配置存在問題,您應該同時包含mapred-site.xmlhbase-site.xml作爲配置資源。