2016-06-07 53 views
0

我有一個RDD [行]有每行的以下數據星火:轉換RDD [行]到數據幀,其中行中的一列是一個列表

[guid, List(peopleObjects)] 
["123", List(peopleObjects1, peopleObjects2, peopleObjects3)] 

我想將其轉換爲一個數據幀
我使用下面的代碼

val personStructureType = new StructType() 
    .add(StructField("guid", StringType, true)) 
    .add(StructField("personList", StringType, true)) 
val personDF = hiveContext.createDataFrame(personRDD, personStructureType) 

我應該使用不同的數據類型爲我的架構,而不是StringType?

如果我的名單只是它工作的字符串,但是當它是一個列表,我得到以下錯誤

scala.MatchError: List(personObject1, personObject2, personObject3) (of class scala.collection.immutable.$colon$colon) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 
    at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445) 
    at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

什麼類型'peopleObject'?如果它是'case class',你能否包含它的定義?更好的辦法是創建你的'RDD'的一些示例代碼。 –

回答

2

這不是完全清楚你正在嘗試做的,但更好的方法,你是什麼試圖做的是創建一個case class,然後將您的RDD行映射到case class,然後調用toDF

喜歡的東西:

case class MyClass(guid: Int, peopleObjects: List[String]) 

val rdd = sc.parallelize(Array((123,List("a","b")),(1232,List("b","d")))) 

val df = rdd.map(r => MyClass(r._1, r._2)).toDF 
df.show 
+----+-------------+ 
|guid|peopleObjects| 
+----+-------------+ 
| 123|  [a, b]| 
|1232|  [b, d]| 
+----+-------------+ 

或者你也可以做到這一點的長手的方式,但不使用的情況下類,像這樣:

val df = sqlContext.createDataFrame(
    rdd.map(r => Row(r._1, r._2)), 
    StructType(Array(
    StructField("guid",IntegerType), 
    StructField("peopleObjects", ArrayType(StringType)) 
)) 
) 
+0

謝謝大衛。這有助於我獲得一些理解。 peopleObjects類具有名稱和位置等屬性。我希望能夠將peopleObjects列表作爲類型傳遞給數據框。這樣當我想創建我的最終輸出格式時,我不必拆卸和重建對象。現在我有一個guid與一個peopleObject關聯,我在guid上獲得一個peopleObject列表的groupbykey。我打算在衆多桌子上做同樣的行爲,然後通過指導加入他們。然後以特定格式創建最終輸出。 –

+0

現在我只是創建json對象,將其作爲字符串傳遞並完成所有聯接。然後重建對象,修改json並創建我的最終輸出。 –

+0

這是一個正確和有用的答案。 @JohnEngelhart你應該接受它。 – Sim