2014-09-30 21 views
0

我想從Kafka消息中讀取記錄並將其放入Hbase。雖然scala腳本沒有任何問題,插入不會發生。請幫幫我。使用Scala插入到Hbase的Spark Streaming問題

輸入: rowkey1,1 rowkey2,2

這裏是我使用的代碼:

object Blaher { 
    def blah(row: Array[String]) { 
    val hConf = new HBaseConfiguration() 
    val hTable = new HTable(hConf, "test") 
    val thePut = new Put(Bytes.toBytes(row(0))) 
    thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(row(1))) 
    hTable.put(thePut) 
    } 
} 


object TheMain extends Serializable{ 
    def run() { 
    val ssc = new StreamingContext(sc, Seconds(1)) 
    val topicmap = Map("test" -> 1) 
    val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap).map(_._2) 
    val words = lines.map(line => line.split(",")).map(line => (line(0),line(1))) 
    val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah)) 
    ssc.start() 
    } 
} 

TheMain.run() 
+0

當您創建SparkContext時,您分配給Spark的核心數量是多少? (sc)? – maasg 2014-09-30 18:46:13

+0

看起來問題是將rdd轉換爲數組。 Somehome foreach rdd對Blaher.blah方法的調用不正確。無論如何要將記錄作爲數組傳遞並將它們插入到hbase中? – 2014-11-08 11:44:07

回答

0

從API文檔爲HTableflushCommits()方法:「執行所有的緩衝投入業務「。您應該在blah()方法的末尾調用它 - 它看起來像是當前正在緩衝但從未執行或在某個隨機時間執行。