0
我想從Kafka消息中讀取記錄並將其放入Hbase。雖然scala腳本沒有任何問題,插入不會發生。請幫幫我。使用Scala插入到Hbase的Spark Streaming問題
輸入: rowkey1,1 rowkey2,2
這裏是我使用的代碼:
object Blaher {
def blah(row: Array[String]) {
val hConf = new HBaseConfiguration()
val hTable = new HTable(hConf, "test")
val thePut = new Put(Bytes.toBytes(row(0)))
thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(row(1)))
hTable.put(thePut)
}
}
object TheMain extends Serializable{
def run() {
val ssc = new StreamingContext(sc, Seconds(1))
val topicmap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap).map(_._2)
val words = lines.map(line => line.split(",")).map(line => (line(0),line(1)))
val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah))
ssc.start()
}
}
TheMain.run()
當您創建SparkContext時,您分配給Spark的核心數量是多少? (sc)? – maasg 2014-09-30 18:46:13
看起來問題是將rdd轉換爲數組。 Somehome foreach rdd對Blaher.blah方法的調用不正確。無論如何要將記錄作爲數組傳遞並將它們插入到hbase中? – 2014-11-08 11:44:07