2015-05-28 30 views
19

我有一些數據的JSON文件,我能夠創建數據框了出來,併爲它的特定部分的模式我很感興趣的樣子如下:Spark DataFrame嵌套結構是否受限選擇?

val json: DataFrame = sqlc.load("entities_with_address2.json", "json")

root 
|-- attributes: struct (nullable = true) 
| |-- Address2: array (nullable = true) 
| | |-- value: struct (nullable = true) 
| | | |-- Zip: array (nullable = true) 
| | | | |-- element: struct (containsNull = true) 
| | | | | |-- value: struct (nullable = true) 
| | | | | | |-- Zip5: array (nullable = true) 
| | | | | | | |-- element: struct (containsNull = true) 
| | | | | | | | |-- value: string (nullable = true) 

當我試圖只選擇最深的領域: json.select("attributes.Address2.value.Zip.value.Zip5").collect()

它給了我一個例外: org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value, StructType(StructField(Zip5, ArrayType(StructType(StructField(value, StringType, true)), true), true)), true)), true), true);

通過查看LogicalPlan的resolveGetField方法,我發現可以從StructType或ArrayType(StructType)中進行選擇,但是有什麼方法可以選擇更深一層?我怎樣才能選擇我需要的領域?

這是完整的例外。

org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true); 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214) 
     at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
     at scala.collection.immutable.List.foldLeft(List.scala:84) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252) 
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.immutable.List.foreach(List.scala:318) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
     at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
     at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44) 
     at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:40) 
     at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080) 
     at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
     at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157) 
     at org.apache.spark.sql.DataFrame.select(DataFrame.scala:476) 
     at org.apache.spark.sql.DataFrame.select(DataFrame.scala:491) 
     at com.reltio.analytics.PREDF.test(PREDF.scala:55) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) 
     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) 
     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) 
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) 
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) 
     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) 
     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) 
     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) 
     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) 
     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) 
     at org.junit.runners.ParentRunner.run(ParentRunner.java:309) 
     at org.junit.runner.JUnitCore.run(JUnitCore.java:160) 
     at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) 
     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) 
     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) 

回答

28

問題是數組類型 - 您可以重新創建這個錯誤很乾脆:

val df = Seq(Tuple1(Array[String]())).toDF("users") 

在這一點df.printSchema顯示:

root 
|-- users: array (nullable = true) 
| |-- element: string (containsNull = true) 

現在如果你嘗試:

df.select($"users.element") 

您得到完全相同的例外 - GetField is not valid...

您有幾個不同的選項可以將Array解開。您可以在單個項目獲得與getItem這樣的:

df.select($"users".getItem(0)) 

而且由於getItem返回另一個Column,你可以只要你想深入挖掘:

df.select($"attributes.Address2".getItem(0).getField("value").getField("Zip").getItem(...) 
// etc 

但隨着一個數組,你可能想以編程方式展開整個數組。如果您查看Hive處理此問題的方式,則需要執行LATERAL VIEW。火花,你將不得不使用explode創建一個蜂巢LATERAL VIEW相當於:

case class User(name: String) 
df.explode($"users"){ case Row(arr: Array[String]) => arr.map(User(_)) } 

請注意,我在地圖上用一個案例類 - 這是什麼文檔都有。如果你不希望創建一個案例類,你可以只返回一個Tuple1(或Tuple2Tuple3等):

df.explode($"users"){ case Row(arr: Array[String]) => arr.map(Tuple1(_)) } 
+1

大衛,感謝您的答覆。很明顯,爲什麼它不工作 - 只能從Struct或Array(Struct)(它在LogicalPlan類中)進行投影。我不想錯過一些我不太明白的東西。儘管答案不是我所期望的,但我真的很感激,因爲我看到有人嘗試過失敗。 看起來像唯一的辦法是爆炸,然後項目。 – evgenii