2017-08-11 53 views
0

我無法使用自述文件中提到的示例向Hbase寫入數據。下面是一個簡單的代碼,說明我的方法,我遇到的錯誤無法從Spark Scala Dataframe將數據寫入Hbase

import org.apache.spark.sql.execution.datasources.hbase._ 

val input = Seq(
| ("a:1", "null", "null", "3", "4", "5", "6"), 
| ("b:1", "2", "3", "null", "null", "5", "6") 
) 

val df = input.toDF 

val TIMELINE_TABLE = "test_timeline" 

val timelineCatalog = 
    s""" 
    "table":{"namespace":"default", "name":""".stripMargin+ TIMELINE_TABLE +"""", "tableCoder":"PrimitiveType"}, 
             |"rowkey":"key", 
             |"columns":{ 
             |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, 
             |"col1":{"cf":"main", "col":"kx", "type":"string"}, 
             |"col2":{"cf":"main", "col":"ky", "type":"string"}, 
             |"col3":{"cf":"main", "col":"rx", "type":"string"}, 
             |"col4":{"cf":"main", "col":"ry", "type":string"}, 
             |"col5":{"cf":"main", "col":"wx", "type":"string"}, 
             |"col6":{"cf":"main", "col":"wy", "type":"string"} 
             |} 
             |}""".stripMargin 

val HBASE_CONF_FILE = "/etc/hbase/conf/hbase-site.xml" 
df.write.options(Map(HBaseTableCatalog.tableCatalog -> timelineCatalog)).format("org.apache.spark.sql.execution.datasources.hbase").save() 

java.lang.ClassCastException: org.json4s.JsonAST$JString cannot be cast to org.json4s.JsonAST$JObject 
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257) 
    at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:77) 
    at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) 
    ... 50 elided 

我的Scala版本是2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131),和我Spark 2.1.0.2.6.0.10-29和HBase的1.1.2是我使用hortonworks-spark/shc連接器,我無法找到任何毛病我的數據。它實際上是一個乾淨的版本。在理想情況下,我希望字符串"null"是實際的null。但是我無法做到這一點,所以我認爲創建字符串至少可以讓它工作。任何幫助將不勝感激。我也提出了一個問題,在Github以及https://github.com/hortonworks-spark/shc/issues/172

回答

0

雖然我還沒有真正找到了使用hortonworks/shc這裏的方式是一種替代方法做同樣的

import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.{SparkSession} 

val sqlContext = sparkSession.sqlContext 
import sqlContext.implicits._ 

val input = Seq(
    ("a:1", "null", "null"), 
    ("b:1", "2", "3") 
) 

val df = input.toDF("col0", "col1", "col2") 

val TIMELINE_TABLE = "prod_timeline" 


val config = HBaseConfiguration.create() 
config.clear() 

config.set("hbase.zookeeper.quorum", "zk0.example.net"); 
config.set("zookeeper.znode.parent", "/hbase") 
config.set("hbase.zookeeper.property.clientPort", "2181") 
config.set(TableOutputFormat.OUTPUT_TABLE, TIMELINE_TABLE) 

val rdd = df.rdd.map(x => { 
    val rowkey = x.get(0).toString 
    var p = new Put(rowkey.getBytes()) 
    p.addColumn("main".toCharArray.map(_.toByte), "a".toCharArray.map(_.toByte), x.get(1).toString.getBytes()) 
    p.addColumn("main".toCharArray.map(_.toByte), "b".toCharArray.map(_.toByte), x.get(2).toString.getBytes()) 

    (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), p) 
}) 

val newAPIJobConfiguration = Job.getInstance(config) 
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, TIMELINE_TABLE) 

newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]]) 

rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration());