2016-09-29 116 views
3

我正在編寫一個項目,接收來自Kafka的數據並寫入Hbase表。因爲我想知道記錄的差異,所以我需要先用Hbase中的相​​同rowkey獲取記錄,然後對接收到的記錄進行相減,最後將新記錄保存到HBase表中。在Spark Streaming中讀取Hbase數據

在開始時,我試着用newAPIHadoop從hbase獲取數據。這裏是我的嘗試:

val conf = HBaseConfiguration.create() 
conf.set("zookeeper.znode.parent", "/hbase-secure") 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) 
conf.set("hbase.zookeeper.quorum", zkQuorum) 
conf.set("hbase.master", masterAddr) 
conf.set("hbase.zookeeper.property.clientPort", portNum) 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName) 

val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf, 
     classOf[TableInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

通過這種方式,我能夠與特定列家人和列名只有一次獲得的記錄的值。通過只說一次,我的意思是每次我開始我的spark-streaming應用程序,這段代碼將被執行,我可以得到一個值,但它不會再執行。因爲每次我收到卡夫卡的記錄時,我想用cf和列從HBase中讀取我的記錄,這對我來說不起作用。

爲了解決這個問題,我將邏輯移動到foreachRDD(),但不幸的是sparkContext看起來不可序列化。我得到了像task is not serialzable這樣的錯誤。

最後,我發現有另一種方法可以使用hbase.clinet HTable從hbase讀取數據。所以這是我最後的工作:

def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = { 
    val conf = HBaseConfiguration.create() 
    conf.set("zookeeper.znode.parent", "/hbase-secure") 
    conf.set("hbase.zookeeper.quorum", "xxxxxx") 
    conf.set("hbase.master", "xxxx") 
    conf.set("hbase.zookeeper.property.clientPort", "xxx") 
    conf.set(TableInputFormat.INPUT_TABLE, "xx") 
    conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx") 

    val testTable = new HTable(conf, "testTable") 
    val scan = new Scan 
    scan.addColumn("cf1".getBytes, "test".getBytes) 
    val rs = testTable.getScanner(scan) 

    var r = rs.next() 
    val res = new StringBuilder 
    while(r != null){ 
     val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes)) 

     res.append(tmp) 
     r= rs.next() 
    } 
val res = res.toString 

//do the following manipulations and return object (ImmutableBytesWritable, Put) 
     .............................. 
     ....................... 
      } 

在main方法我在foreachRDD上述方法使用和保存到HBase的使用方法saveAsNewAPIHadoopDataset

streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration)) 

這對我現在的工作很好,但我有疑問關於這個過程:

以這種方式,我猜想,對於RDD的每個分區,都會創建一個到HBase的連接。我想知道是否可以擴大我的應用程序。假如我在1秒內有超過1000條記錄,看起來在我的火花流式傳輸中會建立1000個連接。

這是從hbase讀取數據的正確方法嗎?在sparkStreaming中從HBase讀取數據的最佳實踐是什麼?或者火花流不應該讀取任何數據,它只是設計爲將數據流寫入數據庫。

在此先感謝。

回答

0

foreachRDD在單個執行程序jvm進程上執行。至少你可以在transferToHBasePut方法中獲得conf的單例實例(意味着在使用jvm進程或新conf的現有set conf之前進行空檢查)。所以這會將Hbase連接的數量減少到Spark集羣中產生的執行程序的數量。

希望這有助於...

+0

謝謝你回答我的問題。我通過將conf作爲參數傳遞給方法transferToHBasePut來嘗試解決方案。但正如你所說的foreach在每個執行者jvm進程上執行,單身人士不能從司機轉移到工人。我認爲這是因爲配置不可分割。最後我發現有一種叫做foreachPartition的方法可以用於RDD。該方法將保證每個RDD分區只建立一次連接。 – Frankie

3

一些學習之後,我創建RDD的每個分區的配置。檢查foreachRDD的設計模式Spark Streaming official website。實際上配置不是連接,所以我不知道如何從現有連接池獲取連接並獲取Hbase的記錄。

+0

您是否已經通過Spark流向HBase進行了讀取?我只能通過打開每個數據的連接來閱讀它。有什麼辦法做到這一點? – zorkaya

相關問題