誰能給出瞭如何在以下火花agrregate動作產生的(9,4)RDD.aggregate動作如何工作(即如何理解參數)?
val rdd = sc.parallelize(List(1,2,3,3))
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
res : (9,4)
誰能給出瞭如何在以下火花agrregate動作產生的(9,4)RDD.aggregate動作如何工作(即如何理解參數)?
val rdd = sc.parallelize(List(1,2,3,3))
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
res : (9,4)
這是星火2.1.0這裏(應該沒有多大關係,但是......)
轉到the official documentation of aggregate
(又名scaladoc)和閱讀:
使用給定的組合函數和中性的「零值」聚合每個分區的元素,然後聚合所有分區的結果。這個函數可以返回與RDD類型不同的結果類型U.因此,我們需要一個將T合併成U的操作,以及一個合併兩個U的操作,就像在scala.TraversableOnce中一樣。這兩個函數都可以修改並返回它們的第一個參數,而不是創建一個新的U以避免內存分配。
簽名是如下(除去隱式參數不特別有趣):
aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
的scaladoc說:
零值爲的累加結果的初始值seqOp運算符的每個分區以及combOp運算符的不同分區的組合結果的初始值 - 這通常是中性元素(例如,無列表拼接或0求和)
在你的情況,zeroValue
是(0, 0)
。
SEQOP用於分區
在你的情況中累積結果的運營商,seqOp
是(x, y) => (x._1 + y, x._2 + 1)
這是接受兩對功能,遺憾的是名爲x
和y
(我會打電話至少或者甚至使用模式匹配和部分功能,即case ((x1, y1), (x2, y2)) => ...
)。
鑑於你有n
分區(你可以檢查出來使用rdd.getNumPartition
),seqOp
將被稱爲n
倍。
的scaladoc說:
combOp關聯運營商使用的結果不同分區
這意味着combOp
將所有結果相結合的seqOp
和應用功能相結合:
(x, y) => (x._1 + y._1, x._2 + y._2)
它是ag ain寫得很差,所以你看到太多,我甚至會撥打噪音。我會寫出如下功能:
{ case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) }
按照類型並給予適當的名稱,最終一切都在斯卡拉變得更加容易;-)
結果基本上總說,一個詳細的解釋:我們希望有一個元組(A,B),其中一個是所有元素的總和和b是他們的數量。
這是通過初始化爲(0,0)完成的,然後我們有兩個功能:
第一個函數只是做加法,當我們在同一時間得到一個單一的元素,即元組是通過將值添加到第一個元素並向第二個元素添加1(count)來從單個元素更新。
第二功能合併兩個結果,以便它只是做元件滴添加
讓我們考慮輸入數據的例子:
讓說1,2是在分區1和3,圖3是在隔壁3
分區1計算
分區1將STA rt與(0,0)。
然後第一個函數開始工作。
當我們添加一個我們得到(1,1)。第一個元素是總和(0 + y,其中y是1),第二個元素是計數(0 + 1)。
現在我們加2,所以我們得到(1 + 2,1 + 1)=(3,2)。第一個元素是我們迄今爲止所看到的值的總和,第二個元素是它們的數量。
分區2計算
在第二分區我們再次用(0,0)開始,然後我們從第二得到(3,1)從所述第一3和(6,2)。
合併的結果
現在第二個功能進場合並兩個:通過總結這兩個元素得到(9,4)
我們合併(3,2)和(6,2)