2015-07-18 17 views
5

我想在斯卡拉殼(司機)來定義一個String類型的累加器變量,但我不斷收到以下錯誤: -無法申報串式蓄能器

scala> val myacc = sc.accumulator("Test") 
<console>:21: error: could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[String] 
     val myacc = sc.accumulator("Test") 
           ^

這似乎是沒有問題Int或Double類型的累加器。

感謝

回答

10

這是因爲星火默認提供Long類型,DoubleFloat只蓄電池。如果你需要別的東西,你必須延長AccumulatorParam

import org.apache.spark.AccumulatorParam 

object StringAccumulatorParam extends AccumulatorParam[String] { 

    def zero(initialValue: String): String = { 
     "" 
    } 

    def addInPlace(s1: String, s2: String): String = { 
     s"$s1 $s2" 
    } 
} 

val stringAccum = sc.accumulator("")(StringAccumulatorParam) 

val rdd = sc.parallelize("foo" :: "bar" :: Nil, 2) 
rdd.foreach(s => stringAccum += s) 
stringAccum.value 

注意

一般來說,你應該避免使用蓄電池對於其中的數據可能會隨時間增長顯著任務。其行爲類似於groupcollect,並且在最壞的情況下可能由於缺乏資源而失敗。累加器主要用於簡單的診斷任務,例如跟蹤基本統計數據。

+0

對於整數累加器,val accum = sc.accumulator(0)(SparkContext.IntAccumulatorParam) 。 – Neethu