2016-08-22 42 views
3

我想通過在python-api上使用像QualiferFilter這樣的過濾器從HBase獲取行。
我知道從代碼中獲取HBase的行的方式。Spark:如何使用HBase過濾器,例如由python-api提供的QualiferFilter

host = 'localhost' 
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" 
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "user", 
       "hbase.mapreduce.scan.columns": "u:uid", 
       "hbase.mapreduce.scan.row.start": "1", "hbase.mapreduce.scan.row.stop": "100"} 
rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", 
          "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
         "org.apache.hadoop.hbase.client.Result", 
         keyConverter=keyConv, valueConverter=valueConv, conf=conf) 

但是,我也想通過使用過濾器獲得行。
我需要添加什麼類型的代碼?

+0

你找出如何做到這一點? – void

+0

我找不到解決方案。畢竟,我使用Scala API。我認爲Python API還不能用於生產環境。 – penlight

回答

0

你好,你可以檢查此代碼................

def doYourStuff(row): 
    text = row.split("\n") 
    data = {} 
    for row in text: 
     if json.loads(row)["qualifier"] == "message": 
       data["message"] = json.loads(row)["value"] 
     if json.loads(row)["qualifier"] == "domain": 
       data["domain"] = json.loads(row)["value"] 
     data["rowKey"] = json.loads(row)["row"] 
     return DoWhatYouWantToDo(data) 

    def save_record(rdd): 
     host = '[email protected]@[email protected]@' 
     table = 'TableName' 
     keyConv1 = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" 
     valueConv1 = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" 
     conf = {"hbase.zookeeper.quorum": host, 
       "hbase.mapred.outputtable": table, 
       "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", 
       "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
       "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} 
     rdd.saveAsNewAPIHadoopDataset(
      keyConverter=keyConv1, valueConverter=valueConv1,conf=conf) 


    hbaseRdd = hbaseRdd.map(lambda x: x[1]) # message_rdd = hbase_rdd.map(lambda x:x[0]) will give only row-key 

    processedRdd = hbaseRdd.map(lambda x: doYourStuff(x)) 
    save_record(processedRdd) 
相關問題