2
Here的aggregateByKey對mutable.HashSet [字符串]示例寫@bbejeck火花aggregateByKey上數據集
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
但是,當我改變爲數據集,我有以下錯誤,是因爲火花2.0 (我使用的版本)不支持數據集上的aggregateByKey?
java.lang.NullPointerException
at org.apache.spark.sql.Dataset.schema(Dataset.scala:393)
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:339)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
下面的代碼:
case class Food(name: String,
price: String,
e_date: String)
rdd.aggregateByKey(Seq(Food("", "", "")).toDS)(
(f1,f2) => f1.union(f2),
(f1,f2) => f1.union(f2))
/////////
found f1 = Invalid tree; null:
null
任何想法,爲什麼發生這種情況,預先感謝您!
謝謝@Bigby,我刪除了.toDS,它現在可以運行 – faustineinsun