2014-04-23 90 views
3

使用mapreduce作業我試圖從Hbase表中刪除行。使用MapReduce作業的HBase批量刪除作業

我收到以下錯誤。

java.lang.ClassCastException: org.apache.hadoop.hbase.client.Delete cannot be cast to org.apache.hadoop.hbase.KeyValue 
     at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:124) 
     at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:551) 
     at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85) 
     at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:99) 
     at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:144) 
     at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164) 
     at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610) 
     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444) 
     at org.apache.hadoop.mapred.Child$4.run(Child.java:268) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.Subject.doAs(Subject.java:396) 
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation. 

看起來像這是由configureIncrementalLoad設置爲KeyValue的輸出引起的。它只有PutSortReducer和KeyValueSortReducer,但不是DeleteSortReducer。

我的代碼:

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.Delete; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

public class DeleteRows extends Configured implements Tool { 

    public static class Map extends 
      Mapper<LongWritable, Text, ImmutableBytesWritable, Delete> { 

     ImmutableBytesWritable hKey = new ImmutableBytesWritable(); 
     Delete delRow; 

     @Override 
     protected void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 
      hKey.set(value.getBytes()); 
      delRow = new Delete(hKey.get()); 
      context.write(hKey, delRow); 
      // Update counters 
      context.getCounter("RowsDeleted", "Success").increment(1); 
     } 
    } 


    @SuppressWarnings("deprecation") 
    public int run(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     args = new GenericOptionsParser(conf, args).getRemainingArgs(); 
     HBaseConfiguration.addHbaseResources(conf); 

     Job job = new Job(conf, "Delete stuff!"); 
     job.setJarByClass(DeleteRows.class); 

     job.setMapperClass(Map.class); 
     job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
     job.setMapOutputValueClass(Delete.class); 

     job.setInputFormatClass(TextInputFormat.class); 
     FileInputFormat.addInputPath(job, new Path(args[0])); 

     HTable hTable = new HTable(args[2]); 
     // Auto configure partitioner and reducer 
     HFileOutputFormat.configureIncrementalLoad(job, hTable); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.waitForCompletion(true); 
     return (0); 
    } 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new DeleteRows(), args); 
     System.exit(exitCode); 
    } 
} 

是否有刪除大量使用他們排按鍵行的更好/更快的方法?很顯然,刪除映射器中的每一行都是可能的,但是我可以想象這比批量刪除正確的區域服務器要慢。

回答

0

結果通過使用TableMapReduceUtil.initTableReducerJob設置reducer而不是HFileOutputFormat.configureIncrementalLoad代碼工作正常。

TableMapReduceUtil.initTableReducerJob(tableName, null, job); 
job.setNumReduceTasks(0); 

但是,這仍然不會爲completebulkload實用程序創建刪除。它只是執行刪除RPC。

2

你的目標是生成HFile與Delete流(實際上刪除標記爲KeyValue)裏面。標準的做法是使用HFileOutputFormat。實際上您只能將KeyValue更改爲這種格式,並且有2個標準縮減器:PutSortReducerKeyValueSortReducer。將減少任務的數量設置爲0,您實際上將所有Delete直接轉換爲當然無法工作的輸出格式。

你最明顯的選擇:

  • 添加您減速DeleteSortReducer。這種減速器非常簡單,你幾乎可以複製。您只需要從Delete中提取各個KeyValue流並對它們進行排序。 PutSortReducer就是很好的例子。 Put更改沒有排序,所以這就是爲什麼這樣的reducer是必要的。
  • 只構造不包含Delete的流,但包含刪除標記的合適的KeyValue的流。這對於速度來說可能是最好的事情。
+0

Roman,你是什麼意思創建一個適當的'KeyValue'包含刪除標記?維護我自己的字段,確定該行是否被刪除,然後定期清理所有需要刪除的行? – reducer

+0

不,在HBase中刪除實際上是標記而不是操作:http://hadoop-hbase.blogspot.com/2011/12/deletion-in-hbase.html –

+0

請參閱https://stackoverflow.com/a/39320340/ 3093 –