3
我正在使用Spark來完成員工記錄積累,並且使用了Spark的累加器。我使用Map [empId,emp]作爲accumulableCollection,以便我可以通過他們的ID搜索員工。我嘗試了一切,但它不起作用。有人指出,如果我使用accumulableCollection的方式有任何邏輯問題,或者不支持Map。下面是我的代碼Spark accumulableCollection不能與mutable配合使用
package demo
import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.SparkContext._
import scala.collection.mutable
object MapAccuApp extends App with Logging {
case class Employee(id:String, name:String, dept:String)
val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]")
val sc = new SparkContext(conf)
implicit def empMapToSet(empIdToEmp: mutable.Map[String, Employee]): mutable.MutableList[Employee] = {
empIdToEmp.foldLeft(mutable.MutableList[Employee]()) { (l, e) => l += e._2}
}
val empAccu = sc.accumulableCollection[mutable.Map[String, Employee], Employee](mutable.Map[String,Employee]())
val employees = List(
Employee("10001", "Tom", "Eng"),
Employee("10002", "Roger", "Sales"),
Employee("10003", "Rafael", "Sales"),
Employee("10004", "David", "Sales"),
Employee("10005", "Moore", "Sales"),
Employee("10006", "Dawn", "Sales"),
Employee("10007", "Stud", "Marketing"),
Employee("10008", "Brown", "QA")
)
System.out.println("employee count " + employees.size)
sc.parallelize(employees).foreach(e => {
empAccu += e
})
System.out.println("empAccumulator size " + empAccu.value.size)
}
看起來像empAccu.value.size沒有給出正確的值,打印工作正常。我得到以下輸出 ' **員工數8 ** ** 大小empAccumulator 4 ** EMP ID = 10007名=梭哈 EMP ID = 10001名=湯姆 EMP ID = 10004名=大衛 EMP ID = 10006 name = Dawn emp id = 10003 name = Rafael emp id = 10002 name = Roger emp id = 10005 name = Moore emp id = 10008 name = Brown ' – smishra 2014-10-15 21:59:32