2016-10-07 37 views
2

我正在閱讀Spark的dynamodb表,該表在一個字段中包含一個JSON字符串,在其他字段中包含字符串。我能夠讀取JSON字段,但不能讀取嵌套的JSON字段。這不是query Json Column using dataframes的複製。這個問題解釋瞭如何從JSON字符串中提取列,但不是嵌套JSON列。從Sparko的DynamoDB JSON字符串中提取嵌套的Json字段?

import com.github.traviscrawford.spark.dynamodb._ 
val users = sqlContext.read.dynamodb("Dynamodb_table") 

users.show(1)

試樣數據集合

|col1              | ID | field2|field3| 
------------------------------------------------------------------------------------- 
|{"a":[{"b":"value1","x":23},{"b":value2,"x":52}],"c":"valC"}|A1 | X1 |Y1 | 

我需要從COL1(JSON結構)和ID字段提取幾個字段。我能夠弄清楚如何解析JSON字段(col1)並從col1獲取字段'c',如here所解釋的,但無法提取嵌套字段。

我的代碼:

val users = sqlContext.read.dynamodb("Dynamodb_table") 
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID") 

data.show(1,false) 
|a            |c |ID| 
--------------------------------------------------------- 
|[{"b":"value1","x":23},{"b":value2","x":52}...]|valC|A1| 

現在,當我嘗試運用上面的數據幀在同get_json_object,我得到的所有空值。

val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID") 
nestedData.show(false) 

|get_json_object(a, '$.b')| c | ID| 
------------------------------------ 
|null      |valC|A1 |  

我試過爆炸以及col'a'有數組和結構。但是,這並不奏效,因爲數據框'data'將col/field'a'作爲字符串而不是數組返回。任何想法如何解決這個問題?

更新:我也嘗試使用JSON4s和net.liftweb.json.parse解析。這也沒有幫助

case class aInfo(b: String) 
case class col1(a: Option[aInfo]), c: String) 

import net.liftweb.json.parse 
val parseJson = udf((data: String) => { 
implicit val formats = net.liftweb.json.DefaultFormats 
parse(data).extract[Data] 
}) 

val parsed = users.withColumn("parsedJSON", parseJson($"data")) 
parsed.show(1) 

當我使用這些解析器時,所有值都爲空。

我預期的結果:我想從數據集

|b  |x |c | ID| 
-------------------- 
|value1|23|valC|A1 | 
|value2|52|valC|A1 | 

回答

1

我相信所有所需的拼圖碎片已經在這裏獲得了扁平結構讓我們遵循一步一步來。您的數據等同於:

val df = Seq((
    """{"a":[{"b":"value1"},{"b": "value2"}],"c":"valC"}""", "A1", "X1", "Y1" 
)).toDF("col1", "ID", "field2", "field3") 

星火提供了實現相同的查詢API作爲提升json4s:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 

,我們可以使用例如LINQ風格的API來定義UDF:

val getBs = udf((s: String) => for { 
    JString(b) <- parse(s) \ "a" \ "b" 
} yield b) 

如果你想提取多個字段,你當然可以擴展它。例如,如果JSON字符串有多個領域

{"a":[{"b":"value1","d":1},{"b":"value2","d":2}],"c":"valC"} 

,您可以:

for { 
    JObject(a) <- parse(s) \ "a" 
    JField("b", JString(b)) <- a 
    JField("d", JInt(d)) <- a 
} yield (b, d) 

這是假定這兩個領域都存在,否則也不會有比賽。爲了處理丟失的字段,你可能更喜歡XPath-like表達式或提取:

case class A(b: Option[String], d: Option[Int]) 

(parse(s) \ "a").extract(Seq[A]) 

UDF這樣可以與explode使用提取領域:

val withBs = df.withColumn("b", explode(getBs($"col1"))) 

與結果:

+--------------------+---+------+------+------+ 
|    col1| ID|field2|field3|  b| 
+--------------------+---+------+------+------+ 
|{"a":[{"b":"value...| A1| X1| Y1|value1| 
|{"a":[{"b":"value...| A1| X1| Y1|value2| 
+--------------------+---+------+------+------+ 

你嘗試使用電梯是不正確的,因爲您預計aaInfo的序列,但將其僅定義爲Option[aInfo]。它應該是Option[Seq[aInfo]]

case class col1(a: Option[Seq[aInfo]], c: String) 

隨着類中定義這樣的分析應該沒有問題的工作。

如果使用當前生成(火花2.1.0)存在由SPARK-17699其中引入了from_json方法需要模式:

import org.apache.spark.sql.types._ 

val bSchema = StructType(Seq(StructField("b", StringType, true))) 
val aSchema = StructField("a", ArrayType(bSchema), true) 
val cSchema = StructField("c", StringType, true) 

val schema = StructType(Seq(aSchema, cSchema)) 

,並且可以被應用爲:

import org.apache.spark.sql.functions.from_json 

val parsed = df.withColumn("col1", from_json($"col1", schema)) 

後您可以使用常用符號選擇字段:

parsed.select($"col1.a.b")