2016-05-10 42 views
0

下面這段代碼是應該找到每密鑰的平均使用combineByKey():CombineBy重點星火方法

val result = input.combineByKey(
(v) => (v, 1), 
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), 
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)). 
map{ case (key, value) => (key, value._1/value._2.toFloat) } 
result.collectAsMap().map(println(_)) 

我上面的方法的執行混亂。假設我們有數據集
((1,1), (1,3), (2,4), (2,3), (3,1))

所以combineByKey的執行將是這個樣子?:

1)首先,它會創建一個累加器(1,1)
2)然後,當它遇到一個具有相同鍵(1)的元組時,它會將鍵值添加到一起?因此,當遇到(1,3)時,密鑰1的新累加器將看起來像(2,2)。由於它添加了(1,1) and (1,3)的密鑰,並且由於有兩個元組和密鑰1,所以它將在(2,2)中放置一個2(在右側)。
3)然後它將繼續爲所有相同的密鑰執行此操作。
4)然後最後它將從每個分區獲取所有累加器,並將鍵(元組左側)和發生次數(元組右側)添加到一個元組中爲每個鍵。

對不起,如果這是有點關閉,我仍然習慣於函數式編程方法!

回答

1

通常情況下,通過查看方法的類型和包含類可以得到很多清晰的結果。

PairRDDFunctions[K, V]

def combineByKey[C]( createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

我們與2型參數,可以一鍵和值,並與一個多,一個組合的方法的類。

系統會要求您提供功能

  • 把一個值轉換成合
  • 把一個價值以及合併成一個組合
  • 轉合和組合成一個組合

立即,這使得你的描述,加上密鑰,因爲我們還沒有提供任何方法操作鍵。

對於每個鍵:

  1. 首先它將從一個值通過將值插入到tuple2的第一時隙以1在第二時隙(1, 1)創建組合器,在這種情況下。
  2. 然後它將通過將值添加到tuple2的第一個槽並遞增第二個槽來將同一個關鍵字的每個附加值合併到組合器中。 (1 + 3,1 + 1)==(4,2)
  3. 然後,它將繼續爲同一個鍵的所有條目執行此操作。
  4. 然後最後它將從每個分區中獲取所有累加器,並將值(元組左側)和發生次數(元組右側)添加到一個元組中每個鍵。

您的困惑可能源於您的密鑰和值屬於同一類型。如果您將密鑰更改爲Strings,則代碼會進行編譯,但如果您使用值執行此操作,則不會。

+0

哦,你指出了什麼讓我感到困惑,事實上,關鍵和價值都是數字!非常感謝您的詳細解釋,現在有道理! – LP496

+0

也只是爲了澄清它在最後一次「從每個分區獲取所有累加器並將值(元組左側)和發生次數(元組右側)添加到每個鍵一個元組「。它知道每個鍵值對來自不同分區的哪個鍵。因此,讓我們從分區1和2分別獲得關鍵字1的(5,4)和(3,2)。和(3,4)和(4,5)分別爲關鍵2。所以spark會知道將key 1元組和key 2元組合在一起? – LP496

+1

是的,你的結構基本上是'(鍵:Int,(總和:Int,count; Int))' –