2017-04-06 82 views
0

這很簡單「如何」問題:: 我們可以通過com.databricks.spark.csv將數據帶到Spark環境。我知道如何通過spark創建HBase表,並手動將數據寫入HBase表。但是,甚至可以通過Spark將文本/ csv/jason文件直接加載到HBase?我看不到有人在談論它。所以,只是檢查。如果可能的話,請指導我一個很好的網站,詳細解釋scala代碼以完成它。通過Spark加載csv文件到HBase

謝謝

回答

0

有多種方法可以做到這一點。

  1. 星火HBase的連接器:

https://github.com/hortonworks-spark/shc

你可以看到很多的例子的鏈接。

  1. 此外,您可以使用SPark核心使用HbaseConfiguration將數據加載到Hbase。

代碼示例:

val fileRDD = sc.textFile(args(0), 2) 
    val transformedRDD = fileRDD.map { line => convertToKeyValuePairs(line) } 

    val conf = HBaseConfiguration.create() 
    conf.set(TableOutputFormat.OUTPUT_TABLE, "tableName") 
    conf.set("hbase.zookeeper.quorum", "localhost:2181") 
    conf.set("hbase.master", "localhost:60000") 
    conf.set("fs.default.name", "hdfs://localhost:8020") 
    conf.set("hbase.rootdir", "/hbase") 

    val jobConf = new Configuration(conf) 
    jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName) 
    jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName) 
    jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName) 

    transformedRDD.saveAsNewAPIHadoopDataset(jobConf) 



def convertToKeyValuePairs(line: String): (ImmutableBytesWritable, Put) = { 

    val cfDataBytes = Bytes.toBytes("cf") 
    val rowkey = Bytes.toBytes(line.split("\\|")(1)) 
    val put = new Put(rowkey) 

    put.add(cfDataBytes, Bytes.toBytes("PaymentDate"), Bytes.toBytes(line.split("|")(0))) 
    put.add(cfDataBytes, Bytes.toBytes("PaymentNumber"), Bytes.toBytes(line.split("|")(1))) 
    put.add(cfDataBytes, Bytes.toBytes("VendorName"), Bytes.toBytes(line.split("|")(2))) 
    put.add(cfDataBytes, Bytes.toBytes("Category"), Bytes.toBytes(line.split("|")(3))) 
    put.add(cfDataBytes, Bytes.toBytes("Amount"), Bytes.toBytes(line.split("|")(4))) 
    return (new ImmutableBytesWritable(rowkey), put) 
    } 
  • 您也可以使用這一個
  • https://github.com/nerdammer/spark-hbase-connector