2017-05-28 50 views
1

我嵌套了JSON,並希望以表格結構輸出。我能夠單獨解析JSON值,但在製表中存在一些問題。我可以通過數據框輕鬆完成。但我想用「RDD ONLY」功能來做。任何幫助非常感謝。使用Spark-Scala將表格結構壓扁JSON RDD only fucntion

輸入JSON:

{ "level":{"productReference":{ 

    "prodID":"1234", 

    "unitOfMeasure":"EA" 

    }, 

    "states":[ 
    { 
     "state":"SELL", 
     "effectiveDateTime":"2015-10-09T00:55:23.6345Z", 
     "stockQuantity":{ 
      "quantity":1400.0, 
      "stockKeepingLevel":"A" 
     } 
    }, 
    { 
     "state":"HELD", 
     "effectiveDateTime":"2015-10-09T00:55:23.6345Z", 
     "stockQuantity":{ 
      "quantity":800.0, 
      "stockKeepingLevel":"B" 
     } 
    } 
    ] }} 

預期輸出:

enter image description here

我試過下面星火代碼。但獲取像這樣的輸出和Row()對象不能解析這個。

079562193 EA,List(SELLABLE,HELD),List(2015-10-09T00:55:23.6345Z,2015-10-09T00:55:23.6345Z),List(1400.0,800.0),List(SINGLE ,單)

def main(Args : Array[String]): Unit = { 

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val salesSchema = StructType(Array(
    StructField("prodID", StringType, true), 
    StructField("unitOfMeasure", StringType, true), 
    StructField("state", StringType, true), 
    StructField("effectiveDateTime", StringType, true), 
    StructField("quantity", StringType, true), 
    StructField("stockKeepingLevel", StringType, true) 
)) 

    val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json") 

    val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => { 

     parse(eachJsonMessages) 

     }).map(insideEachJson=>{ 
     implicit val formats = org.json4s.DefaultFormats 

     val prodID = (insideEachJson\ "level" \"productReference" \"TPNB").extract[String].toString 
     val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString 

     val state= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
      map(x=>(x\"state").extract[String]).toString() 
     val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"effectiveDateTime").extract[String]).toString 
     val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]). 
     toString 
     val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]). 
     toString 

     //Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel) 

    println(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel) 

     }).collect() 

    // sqlContext.createDataFrame(x,salesSchema).show(truncate = false) 

} 
+0

什麼問題阻止了它的工作?看到適當的異常,編譯器錯誤,無論如何都很難診斷問題。 – Phasmid

+0

感謝您查看問題。行對象不能表示它。這就是我剛纔把打印語句弄清楚的原因。我定義的Schema和我傳遞的Row()對象不匹配,所以我希望有任何幫助來解決這個問題。 –

回答

0

有2個版本的解決方案,您的問題。

版本1:

def main(Args : Array[String]): Unit = { 

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val salesSchema = StructType(Array(
    StructField("prodID", StringType, true), 
    StructField("unitOfMeasure", StringType, true), 
    StructField("state", StringType, true), 
    StructField("effectiveDateTime", StringType, true), 
    StructField("quantity", StringType, true), 
    StructField("stockKeepingLevel", StringType, true) 
)) 

    val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")  

    val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => { 

    parse(eachJsonMessages) 

    }).map(insideEachJson=>{ 
    implicit val formats = org.json4s.DefaultFormats 

    val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString 
    val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString 

    val state= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"state").extract[String]).toString() 
    val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"effectiveDateTime").extract[String]).toString 
    val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]). 
    toString 
    val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]). 
    toString 

    Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel) 

    }) 

    sqlContext.createDataFrame(x,salesSchema).show(truncate = false) 

} 

這會給你以下的輸出:

+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+ 
|prodID|unitOfMeasure|state   |effectiveDateTime           |quantity   |stockKeepingLevel| 
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+ 
|1234 |EA   |List(SELL, HELD)|List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z)|List(1400.0, 800.0)|List(A, B)  | 
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+ 

2版

def main(Args : Array[String]): Unit = { 

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val salesSchema = StructType(Array(
    StructField("prodID", StringType, true), 
    StructField("unitOfMeasure", StringType, true), 
    StructField("state", ArrayType(StringType, true), true), 
    StructField("effectiveDateTime", ArrayType(StringType, true), true), 
    StructField("quantity", ArrayType(DoubleType, true), true), 
    StructField("stockKeepingLevel", ArrayType(StringType, true), true) 
)) 

    val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")  

    val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => { 

    parse(eachJsonMessages) 

    }).map(insideEachJson=>{ 
    implicit val formats = org.json4s.DefaultFormats 

    val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString 
    val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString 

    val state= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
     map(x=>(x\"state").extract[String]) 
    val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"effectiveDateTime").extract[String]) 
    val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]) 
    val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]]. 
    map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]) 

    Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel) 

    }) 


    sqlContext.createDataFrame(x,salesSchema).show(truncate = false) 

} 

這會給你以下的輸出:

+------+-------------+------------+------------------------------------------------------+---------------+-----------------+ 
|prodID|unitOfMeasure|state  |effectiveDateTime          |quantity  |stockKeepingLevel| 
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+ 
|1234 |EA   |[SELL, HELD]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]|[1400.0, 800.0]|[A, B]   | 
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+ 

版本1 & 2之間的區別是模式。在版本1中,您正將每一列投射到String,而在版本2中,它們將投射到Array

+0

非常感謝這個解決方案。但是我希望將輸出記錄分成兩行,這是我在問題陳述的「預期輸出:」部分中指定的方式。有沒有什麼辦法可以讓它傳遞Row()對象。您在版本2中提到的輸出,需要再次平鋪,這是可行的。但是我更喜歡扁平行,而將它傳遞給Row()對象。我真的很感謝你對這個問題的興趣 –

0

DataFrameDataSetrddoptimized這裏面有很多的options以儘量達到我們想要的解決方案。

在我看來,DataFrame的開發目的是爲了讓開發人員能夠以表格的形式輕鬆查看數據,以便輕鬆實現邏輯。所以我總是建議用戶使用dataframedataset

談話要少得多,我使用dataframe在下面發佈您的解決方案。一旦你有一個dataframe,切換到rdd是非常容易的。

您所需的解決方案是低於(你必須找到一種方法來讀取json文件作爲其下面json string做:那你:)好運氣的分配)

import org.apache.spark.sql.functions._ 
val json = """ { "level":{"productReference":{ 

        "prodID":"1234", 

        "unitOfMeasure":"EA" 

       }, 

       "states":[ 
        { 
        "state":"SELL", 
        "effectiveDateTime":"2015-10-09T00:55:23.6345Z", 
        "stockQuantity":{ 
         "quantity":1400.0, 
         "stockKeepingLevel":"A" 
        } 
        }, 
        { 
        "state":"HELD", 
        "effectiveDateTime":"2015-10-09T00:55:23.6345Z", 
        "stockQuantity":{ 
         "quantity":800.0, 
         "stockKeepingLevel":"B" 
        } 
        } 
       ] }}""" 

val rddJson = sparkContext.parallelize(Seq(json)) 
var df = sqlContext.read.json(rddJson) 
df = df.withColumn("prodID", df("level.productReference.prodID")) 
    .withColumn("unitOfMeasure", df("level.productReference.unitOfMeasure")) 
    .withColumn("states", explode(df("level.states"))) 
    .drop("level") 
df = df.withColumn("state", df("states.state")) 
    .withColumn("effectiveDateTime", df("states.effectiveDateTime")) 
    .withColumn("quantity", df("states.stockQuantity.quantity")) 
    .withColumn("stockKeepingLevel", df("states.stockQuantity.stockKeepingLevel")) 
    .drop("states") 
df.show(false) 

這會給放出來作爲

+------+-------------+-----+-------------------------+--------+-----------------+ 
|prodID|unitOfMeasure|state|effectiveDateTime  |quantity|stockKeepingLevel| 
+------+-------------+-----+-------------------------+--------+-----------------+ 
|1234 |EA   |SELL |2015-10-09T00:55:23.6345Z|1400.0 |A    | 
|1234 |EA   |HELD |2015-10-09T00:55:23.6345Z|800.0 |B    | 
+------+-------------+-----+-------------------------+--------+-----------------+ 

現在你已經所需的輸出爲dataframe轉換爲rdd只是調用.rdd

df.rdd.foreach(println) 

會給輸出如下

[1234,EA,SELL,2015-10-09T00:55:23.6345Z,1400.0,A] 
[1234,EA,HELD,2015-10-09T00:55:23.6345Z,800.0,B] 

我希望這是有益低於

+0

嗨ramesh,非常感謝。我已經使用數據框完成了這個解決方案。我來自sql背景和daraframe對我來說非常簡單。我想到學習Scala,所以想嘗試使用RDD只有功能:) –

+0

很高興聽到@RohanNayak。那麼你一定找到了更好的解決方案。 :) 爲你感到高興。謝謝你讓我知道。你爲什麼不張貼你的答案,以便我也從中學習。 :) –

+0

嗨,我發佈了我的解決方案。 :) –

1

HI是「數據框」這是我開發的唯一解決方案。尋找完整的「RDD ONLY」解決方案

 
def main (Args : Array[String]):Unit = { 

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark DataFrame few more options").setMaster("local[1]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val sourceJsonDF = sqlContext.read.json("product.json") 

     val jsonFlatDF_level = sourceJsonDF.withColumn("explode_states",explode($"level.states")) 
     .withColumn("explode_link",explode($"level._link")) 
     .select($"level.productReference.TPNB".as("TPNB"), 
     $"level.productReference.unitOfMeasure".as("level_unitOfMeasure"), 
     $"level.locationReference.location".as("level_location"), 
     $"level.locationReference.type".as("level_type"), 
     $"explode_states.state".as("level_state"), 
     $"explode_states.effectiveDateTime".as("level_effectiveDateTime"), 
     $"explode_states.stockQuantity.quantity".as("level_quantity"), 
     $"explode_states.stockQuantity.stockKeepingLevel".as("level_stockKeepingLevel"), 
     $"explode_link.rel".as("level_rel"), 
     $"explode_link.href".as("level_href"), 
     $"explode_link.method".as("level_method")) 
jsonFlatDF_oldLevel.show() 

    } 
+1

偉大的思維方式:)謝謝。我贊成這個答案。 :) –