2015-11-17 64 views
0

從理論上講,我認爲我理解aggregate的工作方式,但我無法通過一個非常簡單的例子。Apache PySpark版本之間的Spark聚合函數是否發生了變化?

值得注意的是,示例here似乎有錯誤的結果。當我在我的機器,我得到.....

seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
ag = sc.parallelize([1, 2, 3, 4]).aggregate((1,0), seqOp, combOp) 

上運行下面的示例然後,結果我得到的是

>>> ag 
(12, 4) 

但是,我引用的鏈接說,結果是(19, 4)。這傢伙正在使用不同版本的火花,(1.2.0)。我正在使用1.5.2. Spark的各個版本之間的聚合函數是否發生了變化?

如果答案是NO,那麼它仍然是莫名其妙的如何12是該元組中的第一個元素。只檢查元組的第一個元素,我們可以看到
y被添加到RDD中每個元素的元組的第一個元素。

因此,從(1,0)開始,並且因爲y分別是1, 2, 3, 4,,這應該會導致一系列元組,如:(2,1), (3,1), (4,1), (5,1)。現在,當我添加元組中的第一個元素時,我得到了14?有沒有什麼明顯的我缺少如何獲得12?非常感謝。

+1

可能的重複[在使用python解釋Spark中的聚合函數](http://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark-using-python) – zero323

+0

至於命題這是重複的,我仍然對我如何在字面上運行相同的代碼並在我的機器上獲得不同的結果感到困惑? – Candic3

+1

差不多每次都會通過不交換/關聯的函數來減少會給你不同的結果。從代數的角度來看,這些不是有效的操作。這在約翰斯的回答中有很多描述。我甚至不會回答這個問題,但我錯過了這個。我已經對這個答案做了一個小小的修改,但這裏真的沒有更多。 – zero323

回答

1

不,aggregate函數的行爲沒有改變。

您鏈接的示例有問題,即零元素不是中性的。

sc.parallelize([], 10).aggregate((1,0), seqOp, combOp) 
## (11, 0) 

sc.parallelize([], 100).aggregate((1,0), seqOp, combOp) 
## (101, 0) 

asc.parallelize([], 1000).aggregate((1,0), seqOp, combOp) 
## (1001, 0) 

帶走這裏的信息是,零值應爲:因爲在實際零值每個分區創建一次,你實際上可以只通過增加分區的數量和在所有路過沒有數據遞增元組的第一個元素您執行的中立給定操作。

編輯

這是什麼意思由中立的元素?在代數意義上,它應該是identity element關於seqOp/combOp。我在這裏定義的一個操作案例很好的選擇是(0, 0)

從開發人員的角度來看,您可以認爲zeroElement被添加到您的數據的次數不是合同的一部分。

+0

值爲「中性」意味着什麼? – Candic3

+0

即使在你的解釋之後,我仍然不知道爲什麼我的一些例子不起作用:'dotproductRDD = sc.parallelize([(4,1),(3,2)])',聚合函數:' dotproductRDD.aggregate(1,lambda acc,aTuple:aTuple [0] * aTuple [1],lambda a,b:a + b)',結果爲7.我甚至不使用零值? – Candic3

+0

我甚至不知道你爲什麼試圖實現。你的意思是像這樣'dotproductRDD..map(lambda aTuple:aTuple [0] * aTuple [1])。sum()'? – zero323

相關問題