2014-09-18 111 views
3

我正在使用Spark來完成員工記錄積累,並且使用了Spark的累加器。我使用Map [empId,emp]作爲accumulableCollection,以便我可以通過他們的ID搜索員工。我嘗試了一切,但它不起作用。有人指出,如果我使用accumulableCollection的方式有任何邏輯問題,或者不支持Map。下面是我的代碼Spark accumulableCollection不能與mutable配合使用

package demo 

import org.apache.spark.{SparkContext, SparkConf, Logging} 

import org.apache.spark.SparkContext._ 
import scala.collection.mutable 


object MapAccuApp extends App with Logging { 
    case class Employee(id:String, name:String, dept:String) 

    val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]") 
    val sc = new SparkContext(conf) 

    implicit def empMapToSet(empIdToEmp: mutable.Map[String, Employee]): mutable.MutableList[Employee] = { 
    empIdToEmp.foldLeft(mutable.MutableList[Employee]()) { (l, e) => l += e._2} 
    } 

    val empAccu = sc.accumulableCollection[mutable.Map[String, Employee], Employee](mutable.Map[String,Employee]()) 

    val employees = List(
    Employee("10001", "Tom", "Eng"), 
    Employee("10002", "Roger", "Sales"), 
    Employee("10003", "Rafael", "Sales"), 
    Employee("10004", "David", "Sales"), 
    Employee("10005", "Moore", "Sales"), 
    Employee("10006", "Dawn", "Sales"), 
    Employee("10007", "Stud", "Marketing"), 
    Employee("10008", "Brown", "QA") 
) 

    System.out.println("employee count " + employees.size) 


    sc.parallelize(employees).foreach(e => { 
    empAccu += e 
    }) 

    System.out.println("empAccumulator size " + empAccu.value.size) 
} 

回答

4

使用accumulableCollection似乎有點小題大做了您的問題,如下演示:

import org.apache.spark.{AccumulableParam, Accumulable, SparkContext, SparkConf} 

import scala.collection.mutable 

case class Employee(id:String, name:String, dept:String) 

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]") 
val sc = new SparkContext(conf) 

implicit def mapAccum = 
    new AccumulableParam[mutable.Map[String,Employee], Employee] 
{ 
    def addInPlace(t1: mutable.Map[String,Employee], 
       t2: mutable.Map[String,Employee]) 
     : mutable.Map[String,Employee] = { 
    t1 ++= t2 
    t1 
    } 
    def addAccumulator(t1: mutable.Map[String,Employee], e: Employee) 
     : mutable.Map[String,Employee] = { 
    t1 += (e.id -> e) 
    t1 
    } 
    def zero(t: mutable.Map[String,Employee]) 
     : mutable.Map[String,Employee] = { 
    mutable.Map[String,Employee]() 
    } 
} 

val empAccu = sc.accumulable(mutable.Map[String,Employee]()) 

val employees = List(
    Employee("10001", "Tom", "Eng"), 
    Employee("10002", "Roger", "Sales"), 
    Employee("10003", "Rafael", "Sales"), 
    Employee("10004", "David", "Sales"), 
    Employee("10005", "Moore", "Sales"), 
    Employee("10006", "Dawn", "Sales"), 
    Employee("10007", "Stud", "Marketing"), 
    Employee("10008", "Brown", "QA") 
) 

System.out.println("employee count " + employees.size) 

sc.parallelize(employees).foreach(e => { 
    empAccu += e 
}) 

println("empAccumulator size " + empAccu.value.size) 
empAccu.value.foreach(entry => 
    println("emp id = " + entry._1 + " name = " + entry._2.name)) 

雖然這是記錄不完整,現在,在Spark代碼庫relevant test是相當啓發。

編輯:事實證明,使用accumulableCollection確實有值:你不需要定義一個AccumulableParam及以下作品。如果他們對人有用,我會離開這兩個解決方案。

case class Employee(id:String, name:String, dept:String) 

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]") 
val sc = new SparkContext(conf) 

val empAccu = sc.accumulableCollection(mutable.HashMap[String,Employee]()) 

val employees = List(
    Employee("10001", "Tom", "Eng"), 
    Employee("10002", "Roger", "Sales"), 
    Employee("10003", "Rafael", "Sales"), 
    Employee("10004", "David", "Sales"), 
    Employee("10005", "Moore", "Sales"), 
    Employee("10006", "Dawn", "Sales"), 
    Employee("10007", "Stud", "Marketing"), 
    Employee("10008", "Brown", "QA") 
) 

System.out.println("employee count " + employees.size) 

sc.parallelize(employees).foreach(e => { 
    // notice this is different from the previous solution 
    empAccu += e.id -> e 
}) 

println("empAccumulator size " + empAccu.value.size) 
empAccu.value.foreach(entry => 
    println("emp id = " + entry._1 + " name = " + entry._2.name)) 

這兩種解決方案都使用Spark 1.0.2進行測試。

+0

看起來像empAccu.value.size沒有給出正確的值,打印工作正常。我得到以下輸出 ' **員工數8 ** ** 大小empAccumulator 4 ** EMP ID = 10007名=梭哈 EMP ID = 10001名=湯姆 EMP ID = 10004名=大衛 EMP ID = 10006 name = Dawn emp id = 10003 name = Rafael emp id = 10002 name = Roger emp id = 10005 name = Moore emp id = 10008 name = Brown ' – smishra 2014-10-15 21:59:32

相關問題