2

Here的aggregateByKey對mutable.HashSet [字符串]示例寫@bbejeck火花aggregateByKey上數據集

val initialSet = mutable.HashSet.empty[String] 
val addToSet = (s: mutable.HashSet[String], v: String) => s += v 
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2 
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets) 

但是,當我改變爲數據集,我有以下錯誤,是因爲火花2.0 (我使用的版本)不支持數據集上的aggregateByKey?

java.lang.NullPointerException 
at org.apache.spark.sql.Dataset.schema(Dataset.scala:393) 
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:339) 
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 

下面的代碼:

case class Food(name: String, 
       price: String, 
       e_date: String)  
rdd.aggregateByKey(Seq(Food("", "", "")).toDS)( 
        (f1,f2) => f1.union(f2), 
        (f1,f2) => f1.union(f2)) 
///////// 
found f1 = Invalid tree; null: 
        null 

任何想法,爲什麼發生這種情況,預先感謝您!

回答