我正在編寫一個項目,接收來自Kafka的數據並寫入Hbase表。因爲我想知道記錄的差異,所以我需要先用Hbase中的相同rowkey獲取記錄,然後對接收到的記錄進行相減,最後將新記錄保存到HBase表中。在Spark Streaming中讀取Hbase數據
在開始時,我試着用newAPIHadoop
從hbase獲取數據。這裏是我的嘗試:
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)
val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
通過這種方式,我能夠與特定列家人和列名只有一次獲得的記錄的值。通過只說一次,我的意思是每次我開始我的spark-streaming應用程序,這段代碼將被執行,我可以得到一個值,但它不會再執行。因爲每次我收到卡夫卡的記錄時,我想用cf和列從HBase中讀取我的記錄,這對我來說不起作用。
爲了解決這個問題,我將邏輯移動到foreachRDD()
,但不幸的是sparkContext看起來不可序列化。我得到了像task is not serialzable
這樣的錯誤。
最後,我發現有另一種方法可以使用hbase.clinet HTable從hbase讀取數據。所以這是我最後的工作:
def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set("hbase.zookeeper.quorum", "xxxxxx")
conf.set("hbase.master", "xxxx")
conf.set("hbase.zookeeper.property.clientPort", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "xx")
conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")
val testTable = new HTable(conf, "testTable")
val scan = new Scan
scan.addColumn("cf1".getBytes, "test".getBytes)
val rs = testTable.getScanner(scan)
var r = rs.next()
val res = new StringBuilder
while(r != null){
val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))
res.append(tmp)
r= rs.next()
}
val res = res.toString
//do the following manipulations and return object (ImmutableBytesWritable, Put)
..............................
.......................
}
在main方法我在foreachRDD上述方法使用和保存到HBase的使用方法saveAsNewAPIHadoopDataset
streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))
這對我現在的工作很好,但我有疑問關於這個過程:
以這種方式,我猜想,對於RDD的每個分區,都會創建一個到HBase的連接。我想知道是否可以擴大我的應用程序。假如我在1秒內有超過1000條記錄,看起來在我的火花流式傳輸中會建立1000個連接。
這是從hbase讀取數據的正確方法嗎?在sparkStreaming中從HBase讀取數據的最佳實踐是什麼?或者火花流不應該讀取任何數據,它只是設計爲將數據流寫入數據庫。
在此先感謝。
謝謝你回答我的問題。我通過將conf作爲參數傳遞給方法transferToHBasePut來嘗試解決方案。但正如你所說的foreach在每個執行者jvm進程上執行,單身人士不能從司機轉移到工人。我認爲這是因爲配置不可分割。最後我發現有一種叫做foreachPartition的方法可以用於RDD。該方法將保證每個RDD分區只建立一次連接。 – Frankie