2016-04-20 29 views
4

我有一些像這樣的JSON數據的星火據幀中:再利用JSON架構使用Scala的

{"gid":"111","createHour":"2014-10-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:40:37.0"},{"revId":"4","modDate":"2014-11-20 01:40:40.0"}],"comments":[],"replies":[]} 
{"gid":"222","createHour":"2014-12-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:39:31.0"},{"revId":"4","modDate":"2014-11-20 01:39:34.0"}],"comments":[],"replies":[]} 
{"gid":"333","createHour":"2015-01-21 00:00:00.0","revisions":[{"revId":"25","modDate":"2014-11-21 00:34:53.0"},{"revId":"110","modDate":"2014-11-21 00:47:10.0"}],"comments":[{"comId":"4432","content":"How are you?"}],"replies":[{"repId":"4441","content":"I am good."}]} 
{"gid":"444","createHour":"2015-09-20 23:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 23:23:47.0"}],"comments":[],"replies":[]} 
{"gid":"555","createHour":"2016-01-21 01:00:00.0","revisions":[{"revId":"135","modDate":"2014-11-21 01:01:58.0"}],"comments":[],"replies":[]} 
{"gid":"666","createHour":"2016-04-23 19:00:00.0","revisions":[{"revId":"136","modDate":"2014-11-23 19:50:51.0"}],"comments":[],"replies":[]} 

我可以讀它:

val df = sqlContext.read.json("./data/full.json") 

我可以打印使用模式df.printSchema

root 
|-- comments: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- comId: string (nullable = true) 
| | |-- content: string (nullable = true) 
|-- createHour: string (nullable = true) 
|-- gid: string (nullable = true) 
|-- replies: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- content: string (nullable = true) 
| | |-- repId: string (nullable = true) 
|-- revisions: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- modDate: string (nullable = true) 
| | |-- revId: string (nullable = true) 

我可以顯示數據df.show(10,false)

+---------------------+---------------------+---+-------------------+---------------------------------------------------------+ 
|comments    |createHour   |gid|replies   |revisions            | 
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+ 
|[]     |2014-10-20 01:00:00.0|111|[]     |[[2014-11-20 01:40:37.0,2], [2014-11-20 01:40:40.0,4]] | 
|[]     |2014-12-20 01:00:00.0|222|[]     |[[2014-11-20 01:39:31.0,2], [2014-11-20 01:39:34.0,4]] | 
|[[4432,How are you?]]|2015-01-21 00:00:00.0|333|[[I am good.,4441]]|[[2014-11-21 00:34:53.0,25], [2014-11-21 00:47:10.0,110]]| 
|[]     |2015-09-20 23:00:00.0|444|[]     |[[2014-11-20 23:23:47.0,2]]        | 
|[]     |2016-01-21 01:00:00.0|555|[]     |[[2014-11-21 01:01:58.0,135]]       | 
|[]     |2016-04-23 19:00:00.0|666|[]     |[[2014-11-23 19:50:51.0,136]]       | 
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+ 

我可以打印/讀取模式val dfSc = df.schema爲:

StructType(StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true), StructField(createHour,StringType,true), StructField(gid,StringType,true), StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true), StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)) 

我可以打印出來更好:

println(df.schema.fields.mkString(",\n")) 
StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true), 
StructField(createHour,StringType,true), 
StructField(gid,StringType,true), 
StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true), 
StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true) 

現在,如果我在同一個文件,而不commentsreplies閱讀與val df2 = sqlContext.read. json("./data/partialRevOnly.json")簡單地刪除這些行,我得到這樣的東西printSchema

root 
|-- comments: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- createHour: string (nullable = true) 
|-- gid: string (nullable = true) 
|-- replies: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- revisions: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- modDate: string (nullable = true) 
| | |-- revId: string (nullable = true) 

我不喜歡這樣,所以我用:

val df3 = sqlContext.read. 
    schema(dfSc). 
    json("./data/partialRevOnly.json") 

在原始模式是dfSc。所以現在我得到了我以前使用刪除的數據的模式:

root 
|-- comments: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- comId: string (nullable = true) 
| | |-- content: string (nullable = true) 
|-- createHour: string (nullable = true) 
|-- gid: string (nullable = true) 
|-- replies: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- content: string (nullable = true) 
| | |-- repId: string (nullable = true) 
|-- revisions: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- modDate: string (nullable = true) 
| | |-- revId: string (nullable = true) 

這是完美的...差不多。我想這個模式分配給一個變量與此類似:

val textSc = StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true), 
    StructField(createHour,StringType,true), 
    StructField(gid,StringType,true), 
    StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true), 
    StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true) 

OK - 這會不會是由於雙引號的工作,以及「其他一些結構」的東西,所以試試這個(錯誤):

import org.apache.spark.sql.types._ 

val textSc = StructType(Array(
    StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true), 
    StructField("createHour",StringType,true), 
    StructField("gid",StringType,true), 
    StructField("replies",ArrayType(StructType(StructField("content",StringType,true), StructField("repId",StringType,true)),true),true), 
    StructField("revisions",ArrayType(StructType(StructField("modDate",StringType,true), StructField("revId",StringType,true)),true),true) 
)) 

Name: Compile Error 
Message: <console>:78: error: overloaded method value apply with alternatives: 
    (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and> 
    (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and> 
    (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType 
cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField) 
      StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true), 

...如果沒有這個錯誤(我想不通周圍的快捷方式),我想,然後使用textSc代替dfSc在JSON數據與強加的模式來閱讀。

我無法找到(通過println或...)以可接受的語法(類似上面)獲取架構的'1對1匹配'方式。我想有些編碼可以通過大小寫匹配來消除雙引號。但是,我仍然不清楚需要哪些規則才能將測試夾具中的確切架構從測試夾具中取出,我可以在重複生產(與測試夾具)代碼中重複使用這些規則。有沒有辦法讓這個模式打印完全按照我的代碼打印?

注:這包括雙引號和所有適當StructField /類型等等要在代碼兼容下降

作爲一個工具,我想到了節省完全成形黃金JSON文件的使用。 Spark工作的開始,但我想最終在適用的結構位置使用日期字段和其他更簡潔的類型,而不是字符串。

如何從我的測試工具(使用帶有註釋和回覆的完整JSON輸入行)中獲取數據框信息到可以將模式作爲源代碼放入生產代碼的點Scala Spark job ?

注:最好的答案是一些編碼手段,但解釋,所以我可以長途跋涉,辛勤工作,辛苦,韋德,犁和苦幹通編碼是也有幫助。 :)

回答

1

那麼,錯誤消息應該告訴你一切,你必須知道這裏 - StructType預計作爲參數字段的順序。所以在你的情況下架構應該看起來像這樣:

StructType(Seq(
    StructField("comments", ArrayType(StructType(Seq(  // <- Seq[StructField] 
    StructField("comId", StringType, true), 
    StructField("content", StringType, true))), true), true), 
    StructField("createHour", StringType, true), 
    StructField("gid", StringType, true), 
    StructField("replies", ArrayType(StructType(Seq(  // <- Seq[StructField] 
    StructField("content", StringType, true), 
    StructField("repId", StringType, true))), true), true), 
    StructField("revisions", ArrayType(StructType(Seq(  // <- Seq[StructField] 
    StructField("modDate", StringType, true), 
    StructField("revId", StringType, true))),true), true))) 
+0

好吧 - 我承認,錯誤消息對我來說有點神祕。我已經測試了你的建議,它的工作原理。我仍然在尋求一種編程方式來構建模式。有了您的幫助,我知道如何使現在手頭的模式......,這是一個很大的一步 - 尤其是當該模式需要進行調整,以使像日期列是比較有代表性的類型不是字符串。謝謝你的幫助。 – codeaperature

+0

這個錯誤當然是神祕的。 Scala中的類型和其他類可以像函數一樣出現,他們在那裏定義了一個'apply'方法。附加'.apply'是可選的。例如'StructType.apply()'和'StructType()'是一樣的。這解釋了錯誤「價值應用方法」。 「重載」是指具有多個咒語的StructType應用方法(在技術上也是構造函數)。它可以保存'Seq','Array'或'List'。任何這些都可以在這裏工作。然而,Seq是一個不錯的選擇,它更加通用,並且不需要List或Array的額外特性。 – Davos

3

我最近遇到了這個問題。我使用Spark 2.0.2,所以我不知道這個解決方案是否適用於早期版本。

import scala.util.Try 
import org.apache.spark.sql.Dataset 
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser 
import org.apache.spark.sql.types.{DataType, StructType} 

/** Produce a Schema string from a Dataset */ 
def serializeSchema(ds: Dataset[_]): String = ds.schema.json 

/** Produce a StructType schema object from a JSON string */ 
def deserializeSchema(json: String): StructType = { 
    Try(DataType.fromJson(json)).getOrElse(LegacyTypeStringParser.parse(json)) match { 
     case t: StructType => t 
     case _ => throw new RuntimeException(s"Failed parsing StructType: $json") 
    } 
} 

請注意,我剛剛從Spark StructType對象中的私有函數中複製的「反序列化」函數。我不知道在不同版本中它會得到多大的支持。

+0

我用Spark 2.1.0試了一下,效果很好。只需要導入這些: 'import org.apache.spark.sql.Dataset' 'import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser' 'import scala.util.Try' 'val caseclassstring = 「」「StructType(結構類型(註釋,ArrayType(結構類型(列表(StructField(comId,DateType,true)...」)「 deserializeSchema(caseclassstring)' – Davos

+0

謝謝,我將這些導入添加到代碼示例 –