我試圖學習除RDD之外還使用更多的DataFrame和DataSet。對於RDD,我知道我可以做someRDD.reduceByKey((x,y) => x + y)
,但是我沒有看到Dataset的這個函數。所以我決定寫一個。在Spark數據集中滾動您自己的reduceByKey
someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
val result = mutable.HashMap.empty[(Long,Long),Int]
val keys = mutable.HashSet.empty[(Long,Long)]
y.keys.foreach(z => keys += z)
x.keys.foreach(z => keys += z)
for (elem <- keys) {
val s1 = if(x.contains(elem)) x(elem) else 0
val s2 = if(y.contains(elem)) y(elem) else 0
result(elem) = s1 + s2
}
result
})
但是,這會將所有內容都返回給驅動程序。你會如何寫這個返回Dataset
?也許mapPartition並在那裏做?
注意此編譯但不運行,因爲它沒有對編碼器還Map
使用Spark 2.0.0,請試試這個,yourDataset.groupByKey(...)。reduceGroups(...) –
催化劑優化器是否會注意到您正在進行一個組,然後使其更有效? 「有效率」我的意思是關於RDD如何通過密鑰進行降低比通過減少的方式來實現組更好? –