2016-07-14 21 views
15

我試圖學習除RDD之外還使用更多的DataFrame和DataSet。對於RDD,我知道我可以做someRDD.reduceByKey((x,y) => x + y),但是我沒有看到Dataset的這個函數。所以我決定寫一個。在Spark數據集中滾動您自己的reduceByKey

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => { 
    val result = mutable.HashMap.empty[(Long,Long),Int] 
    val keys = mutable.HashSet.empty[(Long,Long)] 
    y.keys.foreach(z => keys += z) 
    x.keys.foreach(z => keys += z) 
    for (elem <- keys) { 
    val s1 = if(x.contains(elem)) x(elem) else 0 
    val s2 = if(y.contains(elem)) y(elem) else 0 
    result(elem) = s1 + s2 
    } 
    result 
}) 

但是,這會將所有內容都返回給驅動程序。你會如何寫這個返回Dataset?也許mapPartition並在那裏做?

注意此編譯但不運行,因爲它沒有對編碼器還Map

+0

使用Spark 2.0.0,請試試這個,yourDataset.groupByKey(...)。reduceGroups(...) –

+6

催化劑優化器是否會注意到您正在進行一個組,然後使其更有效? 「有效率」我的意思是關於RDD如何通過密鑰進行降低比通過減少的方式來實現組更好? –

回答

18

我假設你的目標是把這個成語數據集:

rdd.map(x => (x.someKey, x.someField)) 
    .reduceByKey(_ + _) 

// => returning an RDD of (KeyType, FieldType) 

目前,最接近的解決方案我有數據集API發現是這樣的:

ds.map(x => (x.someKey, x.someField))   // [1] 
    .groupByKey(_._1)        
    .reduceGroups((a, b) => (a._1, a._2 + b._2)) 
    .map(_._2)         // [2] 

// => returning a Dataset of (KeyType, FieldType) 

// Comments: 
// [1] As far as I can see, having a map before groupByKey is required 
//  to end up with the proper type in reduceGroups. After all, we do 
//  not want to reduce over the original type, but the FieldType. 
// [2] required since reduceGroups converts back to Dataset[(K, V)] 
//  not knowing that our V's are already key-value pairs. 

看起來不很優雅,並根據快速基準,也就是多升ess性能,所以也許我們在這裏丟失了一些東西...

注意:替代可能是使用groupByKey(_.someKey)作爲第一步。問題是使用groupByKey將類型從常規Dataset更改爲KeyValueGroupedDataset。後者不具有常規的map功能。相反,它提供了一個mapGroups,這看起來不太方便,因爲它將值包裝到Iterator中,並根據文檔字符串執行洗牌。

+3

這個技巧。儘管如此,reduceByKey更有效,因爲它在洗牌之前在每個節點上減少。做groupByKey首先洗牌所有的元素,然後開始減少。這就是爲什麼它性能低得多。有趣的是,這是我以前做的事情,我知道有關reduceByKey但我忘記了:-) –

+0

@CarlosBribiescas我已閱讀interwebs數據集利用Sparks的Catalyst優化器,並應能夠推下在洗牌之前減少功能。這可以解釋爲什麼'Dataset' API中沒有'reduceByKey'。然而,根據我的經驗,情況並非如此,'groupByKey.reduceGroups'洗牌的數據要多得多,而且比'reduceByKey'慢得多。 –

+4

看起來reduceGroups的性能已經從2.0.1和2.1.0 [Spark-16391](https://issues.apache.org/jira/browse/SPARK-16391) – Franzi

3

更有效的解決方案使用mapPartitionsgroupByKey前減少洗牌量(注意,這是不完全一樣的簽名reduceByKey但我認爲這是更加靈活,通過比需要的數據集包括一個元組的函數)。

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V) 
    (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = { 
    def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = { 
    iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator 
    } 
    ds.mapPartitions(h(f, g, _)) 
    .groupByKey(f)(encK) 
    .reduceGroups(g) 
} 

根據數據的形狀/尺寸,這是reduceByKey性能的1秒鐘之內,以及約2x一樣快groupByKey(_._1).reduceGroups。還有改進的餘地,所以建議是值得歡迎的。