2016-06-28 218 views
0

我正在使用nerdammer hbase spark連接器並讀取兩個hbase表,因爲RDD會將它們轉換爲數據框並運行SQL以加入它的預期工作。Hbase Spark RDD JSON列

其中一個表中的一列中有JSON對象我需要在最終結果中提取特定的JSON屬性值這怎麼可能。 如果我在ARDD的D列中有Json數據,例如[{「foo」:「bar」,「baz」:「qux」}],我需要創建新的RDD或DF,其值僅爲「baz」列,以便最終當我連接時,我只獲得這個屬性的值。

val ARDD = sc.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](ATableName) 
     .select("A","B","C","D","E").inColumnFamily("pCF") 

     val BRDD = sc.hbaseTable[(Option[String],Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](BTableName) 
     .select("A","B","C","D","E","F").inColumnFamily("tCF") 


    val ADF = sqlContext.createDataFrame(ARDD).registerTempTable("aDF") 
    val BDF = sqlContext.createDataFrame(BRDD).registerTempTable("bDF") 

val resultset = sqlContext.sql("SELECT aDF._1, bDF._2, bDF._3, bDF._4, bDF._5, bDF._6, bDF._3, aDF._1, aDF._2, bDF._1 FROM aDF, bDFWHERE aDF._5 = bDF._7").collect() 

val joinedResult = resultset.foreach(println) 
    println("Count " + joinedResult) 

回答

0

創建一個UDF來實現這一目標,在我的DF擁有該分析信息創建新的列

import org.json4s.jackson.JsonMethods._ 
import org.apache.spark.sql.functions.udf 
def udfScoreToCategory=udf((t: String) => { 
    compact((parse(t.toString,true) \ "myField"))}) 


val abc= myDF.withColumn("_p", udfScoreToCategory(myDF("_4"))).registerTempTable("odDF")