2016-04-30 39 views
1

From the Spark source code for Accumulable is the addInPlace method從不同的分區合併相同累加的值:Apache Spark Accumulable addInPlace需要返回R1?或者任何價值?

/** 
* Merge two accumulated values together. Is allowed to modify and return the first value 
* for efficiency (to avoid allocating objects). 
* 
* @param r1 one set of accumulated data 
* @param r2 another set of accumulated data 
* @return both data sets merged together 
*/ 
def addInPlace(r1: R, r2: R): R 

我假設我能回到我想要的任何數值,當我在我的實現AccumulableParam的定義addInPlace。我正在假設我傳入的任何指針,因爲r1會指向我返回的任何內容。

我的老闆認爲傳入的r1是返回語句中唯一允許的東西。這聽起來是Ann-Landers-ish,誰是對的?

有一種情況是我只想丟棄r1並將其替換爲r2中的對象,這將是此合併累加器的新值。

我可以直接返回r2嗎?或者我必須對r1進行深層複製,因爲我的(更多的Java編程經驗)老闆認爲?要明確的是,雖然Spark當然是用Scala編寫的,但我正在用Java編寫一個實現AccumulableParam的類。

回答

2

作爲執行摺疊式操作時的經驗法則,您應該從不修改第二個參數。我們可以用一個簡單的例子來說明原因。讓我們假設我們有這樣簡單的累加器:

import org.apache.spark.AccumulatorParam 
import scala.collection.mutable.{Map => MMap} 

type ACC = MMap[String, Int] 

object DummyAccumulatorParam extends AccumulatorParam[ACC] { 
    def zero(initialValue: ACC): ACC = { 
    initialValue 
    } 

    def addInPlace(acc: ACC, v: ACC): ACC = { 
    v("x") = acc.getOrElse("x", 0) + v.getOrElse("x", 0) 
    v 
    } 
} 

這是特別有用,但沒關係。點是它修改了第二個參數。讓我們看看它是否有效:

val rdd = sc.parallelize(Seq(MMap("x" -> 1), MMap("x" -> 1), MMap("x" -> 1)), 1) 

val accum1 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam) 
rdd.foreach(x => accum1 += x) 

accum1.value 
// scala.collection.mutable.Map[String,Int] = Map(x -> 3) 

到目前爲止好。我們甚至可以創造一個又一個,它窗臺按預期工作:

val accum2 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam) 
rdd.foreach(x => accum2 += x) 

accum2.value 
// scala.collection.mutable.Map[String,Int] = Map(x -> 3) 

現在讓緩存中的數據:

rdd.cache 

重複的過程:

val accum3 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam) 
rdd.foreach(x => accum3 += x) 

val accum4 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam) 
rdd.foreach(x => accum4 += x) 

,並檢查蓄電池值:

accum4.value 
// scala.collection.mutable.Map[String,Int] = Map(x -> 6) 

and RD D內容:

​​

因此,您可以看到返回或修改第二個參數並不安全。它也適用於其他類似的操作,如foldaggregate

+0

真棒回答我們平常!同時,它給我留下了幾個問題。首先,這是否與'aggregate()'是一個轉換有關的任何方式,我們不能假設累加器只運行一次,即使是第一個參數?我認爲'foreach()'是安全的(基於行爲)的方式,每個任務只能看到一次累加器運行?雖然我認爲你正在提出一個不同的觀點。 – JimLohse

+0

第二,我看到你修改了RDD嗎?我認爲RDD是不可改變的,還是過於簡單?如果我認爲我可以**可靠地修改RDD元素,那麼我不會使用累加器:)以(1,1,1)結尾的RDD以(1,3,6)結尾,對嗎? – JimLohse

+1

不可變的數據結構並不意味着不可變的數據。如果不可變數據結構包含可變對象,則這些對象可能會更改。它們不能被另一組對象所取代。但是,在Spark中對數據進行變異是嚴重的編程錯誤,這裏僅用於說明爲什麼不應該改變/返回第二個參數。 – zero323