2016-08-03 44 views
2

我想要做這樣的事情。如何將Spark的累加器傳遞給函數?

val ac = sc.accumulator(0) 
.... 
a = a.map(x => someFunction(x, the_accumulator_object)) 
.... 

應在the_accumulator_ojbect在上面的代碼的地方是什麼?會寫ac那裏就好了嗎?

此外,在功能

def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType = 
{ 
    ..... 
} 

應在TypeOfAccumulator在上面的函數的地方是什麼?

回答

4

約星火蓄電池附加信息可以發現here

根據關於建立累加器的斯卡拉-文檔:

/** *創建一個[org.apache.spark。累加器]]給定類型的 變量,並在Spark UI中顯示*的名稱。使用+=方法,任務可以將 「累加」到累加器。只有* 驅動程序可以訪問累加器的value。 */

默認累加器類型爲int。你雖然可以設置自己的類型,但需要正確實施+=方法將值添加到您自己的蓄電池類型:

val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator") 

你的主要代碼片段將是這樣的:

val ac = sc.accumulator(0, "some accumulator") 
.... 
a = a.map(x => someFunction(x, ac)) 
.... 
System.out.println("My accumulator value is: " + ac.value) 

someFunction方法植入將如下所示:

def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType = 
{ 
    ... 
    ac += 1 
    ... 
} 
+0

什麼是累加器聲明中的第二個參數?這是爲了識別不同的累加器嗎?其次,我可以使用任何類型的累加器。例如,我想使用Set類型。 – pythonic

+0

編輯答案並回答您的問題。 –

+0

很酷。謝謝。我會嘗試你的解決方案。 – pythonic