2017-04-20 171 views
0

我想流數據插入到hbase; 這是我的代碼:火花流HBase的錯誤

val tableName = "streamingz" 
val conf = HBaseConfiguration.create() 
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml")) 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 

val admin = new HBaseAdmin(conf) 
if (!admin.isTableAvailable(tableName)) { 
    print("-----------------------------------------------------------------------------------------------------------") 
    val tableDesc = new HTableDescriptor(tableName) 
    tableDesc.addFamily(new HColumnDescriptor("z1".getBytes())) 
    tableDesc.addFamily(new HColumnDescriptor("z2".getBytes())) 
    admin.createTable(tableDesc) 
} else { 
    print("Table already exists!!--------------------------------------------------------------------------------------") 
} 
val ssc = new StreamingContext(sc, Seconds(10)) 
val topicSet = Set("fluxAstellia") 
val kafkaParams = Map[String, String]("metadata.broker.list" - > "10.32.201.90:9092") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
val lines = stream.map(_._2).map(_.split(" ", -1)).foreachRDD(rdd => { 
    if (!rdd.partitions.isEmpty) { 
     val myTable = new HTable(conf, tableName) 
     rdd.map(rec => { 
      var put = new Put(rec._1.getBytes) 
      put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec._2)) 
      myTable.put(put) 
     }).saveAsNewAPIHadoopDataset(conf) 
     myTable.flushCommits() 
    } else { 
     println("rdd is empty") 
    } 

}) 


ssc.start() 
ssc.awaitTermination() 

} 
} 

我得到這個錯誤:

:66: error: value _1 is not a member of Array[String] 
     var put = new Put(rec._1.getBytes) 

我初學者我這樣怎麼能不修復這個錯誤,我有一個問題:

哪裏準確地創建表格;流媒體過程之外還是內部?

謝謝

回答

0

你的錯誤,基本上就行var put = new Put(rec._1.getBytes) 您可以撥打_n僅在地圖(_1關鍵和_2值)或元組。
rec是你通過空格字符流中分割字符串得到的字符串數組。如果你是在第一個元素之後,你會把它寫成var put = new Put(rec(0).getBytes)。同樣,在下一行,你會寫爲put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec(1)))

+0

什麼有關創建HBase的表。我應該在哪裏創建它? –

+0

我得到這個新的錯誤哥'錯誤的jobscheduler:錯誤運行的工作流的工作1492790490000 ms.0 org.apache.spark.SparkException:任務不serializable' –

+0

它應該告訴你的是什麼類沒有在堆棧跟蹤序列化。不管你的map()關閉是什麼,都應該是可序列化的。我的猜測是HTable不可序列化。您可以使它序列化與'VAL myTable的=新HTable(CONF,表名)與java.io.Serializable'更換線或者是將其標記爲'@Transient lazy'所以每個執行人如果那是創建自己的實例你想做。 – sparker