2014-12-02 142 views
2

我可以使用下面的代碼讀卡夫卡消息:星火RDD寫入HBase的

val ssc = new StreamingContext(sc, Seconds(50)) 
val topicmap = Map("test" -> 1) 
val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap) 

能否請你幫我,我怎麼能寫卡夫卡讀取到HBase的每封郵件?這是我想寫的,但沒有成功。

lines.foreachRDD(rdd => { 
    rdd.foreach(record => { 
    val i = +1 
    val hConf = new HBaseConfiguration() 
    val hTable = new HTable(hConf, "test") 
    val thePut = new Put(Bytes.toBytes(i)) 
    thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(record)) 
    }) 
}) 

回答

5

那麼,你實際上並沒有執行Put,你是Mereley創建一個Put請求並向它添加數據。你缺少的是一個

hTable.put(thePut); 
+0

我的錯誤。我錯過了,儘管我把它作爲我的代碼的一部分。這裏的問題是:如何將每條消息從Kafka傳遞給Hbase?我能夠讀取來自Kafka的消息,但是在foreachRDD中無法將該消息移植到Hbase。打電話給foreachRDD我做錯了嗎? – 2014-12-04 11:53:26