2017-02-17 39 views
1

我想在Apache Spark中使用自定義累加器來累積一組。結果應該是Set [String]類型。對於這個我創建自定義累加器:如何創建自定義累加器,即Set [String]?

object SetAccumulatorParam extends AccumulatorParam[Set[String]] { 
    def addInPlace(r1: mutable.Set[String], r2: mutable.Set[String]): mutable.Set[String] = { 
     r1 ++= r2 
    } 

    def zero(initialValue: mutable.Set[String]): mutable.Set[String] = { 
     Set() 
    } 
} 

但我不能實例化這種類型的變量。

val tags = sc.accumulator(Set(""))(SetAccumulatorParam) 

導致錯誤。請幫助。

required: org.apache.spark.AccumulatorParam[Set[String]] 
+0

你做了什麼樣子,從官方文檔(HTTP很大的不同://spark.apache。組織/文檔/最新/編程-guide.html#蓄電池)。我也對這裏使用的對象持懷疑態度,因爲我會假設Spark想要在某個時刻實例化這個累加器。 – LiMuBei

回答

1

1.6更新:

object StringSetAccumulatorParam extends AccumulatorParam[Set[String]] { 
    def zero(initialValue: Set[String]): Set[String] = { Set() } 
    def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = { s1 ++ s2 } 
} 

val stringSetAccum = sc.accumulator(Set[String]())(StringSetAccumulatorParam) 
sc.parallelize(Array("1", "2", "3", "1")).foreach(s => stringSetAccum += Set(s)) 
stringSetAccum.value.toString 
res0: String = Set(2, 3, 1) 

在星火2.0你可能罰款與使用現有collectionAccumulator(如果你關心不同的值,你可以檢查且僅當它們不加存在):

val collAcc = spark.sparkContext.collectionAccumulator[String]("myCollAcc") 
collAcc: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 32154, name: Some(myCollAcc), value: []) 

spark.sparkContext.parallelize(Array("1", "2", "3")).foreach(s => collAcc.add(s)) 

collAcc.value.toString 
res0: String = [3, 2, 1] 

更多信息:https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2

0

添加到伯塞斯庫的答案,這裏是一個普通的案例SetAccumulator for spark 2.x.

import org.apache.spark.util.AccumulatorV2 

class SetAccumulator[T](var value: Set[T]) extends AccumulatorV2[T, Set[T]] { 
    def this() = this(Set.empty[T]) 
    override def isZero: Boolean = value.isEmpty 
    override def copy(): AccumulatorV2[T, Set[T]] = new SetAccumulator[T](value) 
    override def reset(): Unit = value = Set.empty[T] 
    override def add(v: T): Unit = value = value + v 
    override def merge(other: AccumulatorV2[T, Set[T]]): Unit = value = value ++ other.value 
} 

而且你可以使用它像這樣:

val accum = new SetAccumulator[String]() 
spark.sparkContext.register(accum, "My Accum") // Optional, name it for SparkUI 

spark.sparkContext.parallelize(Seq("a", "b", "a", "b", "c")).foreach(s => accum.add(s)) 

accum.value 

,輸出:

Set[String] = Set(a, b, c)