2016-01-15 34 views
-1

我有以下代碼:星火:一個輸出HashSet的保存到文件

val mySet: HashSet[String] = HashSet[String]() 
val mySetBroadcastVar = sc.broadcast(mySet) 

val output = input.map { t => 

    if (t.getA()!= null) { 
    stSetBroadcastVar.value.add(t.getA()) 
    } 
}.count() 

sc.parallelize(myBroadcastVar.value.toList, 1).saveAsTextFile("mySetValues") 

然後將文件mySetValues始終是空的,即使它不應該。這是否因爲mySetValues在輸出計算之前已保存?我該如何解決這個問題?謝謝!

回答

5
  1. 廣播變量是在整個任務共享只讀數據和stagesin一個有效的方式
  2. 任務不應該修改廣播變量更新不會在任何其他節點反映,他們不運回到司機身邊。
  3. 您需要用於此目的的蓄電池。

實施例(從火花殼)

scala> val acc = sc.accumulableCollection(scala.collection.mutable.HashSet[String]()) 
acc: org.apache.spark.Accumulable[scala.collection.mutable.HashSet[String],String] = Set() 

scala> val names=sc.parallelize(Seq("aravind","sam","kenny","apple")) 
names: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[86] at parallelize at <console>:22 

scala> names.foreach(x => if(x.startsWith("a")) acc += x) 

scala> acc 
res27: org.apache.spark.Accumulable[scala.collection.mutable.HashSet[String],String] = Set(apple, aravind) 

scala> 
+1

是否有如何使用蓄能器用於這種情形的例子?我看到的另一個蓄能器例子基本上只是櫃檯。 – Edamame

+1

@Edamame添加了一個工作示例 –