2017-04-07 26 views

回答

1

這是星火2.1.0這裏(應該沒有多大關係,但是......)

轉到the official documentation of aggregate(又名scaladoc)和閱讀:

使用給定的組合函數和中性的「零值」聚合每個分區的元素,然後聚合所有分區的結果。這個函數可以返回與RDD類型不同的結果類型U.因此,我們需要一個將T合併成U的操作,以及一個合併兩個U的操作,就像在scala.TraversableOnce中一樣。這兩個函數都可以修改並返回它們的第一個參數,而不是創建一個新的U以避免內存分配。

簽名是如下(除去隱式參數不特別有趣):

aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

的scaladoc說:

零值爲的累加結果的初始值seqOp運算符的每個分區以及combOp運算符的不同分區的組合結果的初始值 - 這通常是中性元素(例如,無列表拼接或0求和)

在你的情況,zeroValue(0, 0)

SEQOP用於分區

在你的情況中累積結果的運營商,seqOp(x, y) => (x._1 + y, x._2 + 1)這是接受兩對功能,遺憾的是名爲xy(我會打電話至少或者甚至使用模式匹配和部分功能,即case ((x1, y1), (x2, y2)) => ...)。

鑑於你有n分區(你可以檢查出來使用rdd.getNumPartition),seqOp將被稱爲n倍。

的scaladoc說:

combOp關聯運營商使用的結果不同分區

這意味着combOp將所有結果相結合的seqOp和應用功能相結合:

(x, y) => (x._1 + y._1, x._2 + y._2) 

它是ag ain寫得很差,所以你看到太多,我甚至會撥打噪音。我會寫出如下功能:

{ case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) } 

按照類型並給予適當的名稱,最終一切都在斯卡拉變得更加容易;-)

3

結果基本上總說,一個詳細的解釋:我們希望有一個元組(A,B),其中一個是所有元素的總和和b是他們的數量。

這是通過初始化爲(0,0)完成的,然後我們有兩個功能:

  • 第一個函數只是做加法,當我們在同一時間得到一個單一的元素,即元組是通過將值添加到第一個元素並向第二個元素添加1(count)來從單個元素更新。

  • 第二功能合併兩個結果,以便它只是做元件滴添加

讓我們考慮輸入數據的例子:

讓說1,2是在分區1和3,圖3是在隔壁3

分區1計算

分區1將STA rt與(0,0)。

然後第一個函數開始工作。

當我們添加一個我們得到(1,1)。第一個元素是總和(0 + y,其中y是1),第二個元素是計數(0 + 1)。

現在我們加2,所以我們得到(1 + 2,1 + 1)=(3,2)。第一個元素是我們迄今爲止所看到的值的總和,第二個元素是它們的數量。

分區2計算

在第二分區我們再次用(0,0)開始,然後我們從第二得到(3,1)從所述第一3和(6,2)。

合併的結果

現在第二個功能進場合並兩個:通過總結這兩個元素得到(9,4)

我們合併(3,2)和(6,2)