我嵌套了JSON,並希望以表格結構輸出。我能夠單獨解析JSON值,但在製表中存在一些問題。我可以通過數據框輕鬆完成。但我想用「RDD ONLY」功能來做。任何幫助非常感謝。使用Spark-Scala將表格結構壓扁JSON RDD only fucntion
輸入JSON:
{ "level":{"productReference":{
"prodID":"1234",
"unitOfMeasure":"EA"
},
"states":[
{
"state":"SELL",
"effectiveDateTime":"2015-10-09T00:55:23.6345Z",
"stockQuantity":{
"quantity":1400.0,
"stockKeepingLevel":"A"
}
},
{
"state":"HELD",
"effectiveDateTime":"2015-10-09T00:55:23.6345Z",
"stockQuantity":{
"quantity":800.0,
"stockKeepingLevel":"B"
}
}
] }}
預期輸出:
我試過下面星火代碼。但獲取像這樣的輸出和Row()對象不能解析這個。
079562193 EA,List(SELLABLE,HELD),List(2015-10-09T00:55:23.6345Z,2015-10-09T00:55:23.6345Z),List(1400.0,800.0),List(SINGLE ,單)
def main(Args : Array[String]): Unit = {
val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val salesSchema = StructType(Array(
StructField("prodID", StringType, true),
StructField("unitOfMeasure", StringType, true),
StructField("state", StringType, true),
StructField("effectiveDateTime", StringType, true),
StructField("quantity", StringType, true),
StructField("stockKeepingLevel", StringType, true)
))
val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")
val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {
parse(eachJsonMessages)
}).map(insideEachJson=>{
implicit val formats = org.json4s.DefaultFormats
val prodID = (insideEachJson\ "level" \"productReference" \"TPNB").extract[String].toString
val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString
val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"state").extract[String]).toString()
val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"effectiveDateTime").extract[String]).toString
val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]).
toString
val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]).
toString
//Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)
println(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)
}).collect()
// sqlContext.createDataFrame(x,salesSchema).show(truncate = false)
}
什麼問題阻止了它的工作?看到適當的異常,編譯器錯誤,無論如何都很難診斷問題。 – Phasmid
感謝您查看問題。行對象不能表示它。這就是我剛纔把打印語句弄清楚的原因。我定義的Schema和我傳遞的Row()對象不匹配,所以我希望有任何幫助來解決這個問題。 –