2017-03-05 30 views
0

我正在考慮如何使用Spark的HBase ColumnRangeFilter。
我看看org.apache.hadoop.hbase.mapreduce.TableInputFormat,但是這個API不包含ColumnRangeFilter。
所以我不知道如何由Spark做ColumnRangeFilter。如何使用Spark的HBase ColumnRangeFilter

我想使用以「20170225」開頭並以「20170305」結尾的ColumnRangeFilter。

我咳嗽掃描像下面的代碼。

val conf = HBaseConfiguration.create() 
conf.set(TableInputFormat.INPUT_TABLE, "like_count") 
val startRow = "001" 
val endRow = "100" 
conf.set(TableInputFormat.SCAN_ROW_START, startRow) 
conf.set(TableInputFormat.SCAN_ROW_STOP, endRow) 
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) 

我需要添加哪些代碼? 如果有人有任何建議,請告訴我。

回答

0

使用掃描對象來設置開始和結束行,並設置在配置的HBase該掃描對象然後該配置對象傳遞給tableInputFormat https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html

Scan scan = new Scan(startRow, endRow); 
scan.setMaxVersions(MAX_VERSIONS); 

//This can also be done if not specified in scan object constructor 
scan.setFilter(new ColumnRangeFilter(startrow,true,endrow,true)); 


HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 

conf.set(TableInputFormat.INPUT_TABLE, username + ":" + path); 
conf.set(TableInputFormat.SCAN, convertScanToString(scan)); 


tableInputFormat.setConf(conf); 
相關問題