2016-01-18 39 views
2

我注意到一個奇怪的行爲,而使用Spark的項目使用Scala 2.10,我正在閱讀一個屬性文件,並寫入一個地圖(loadConfig)內的所有內容,我也創建了一個簡單方法,該方法返回給定鍵的值。Scala類懶惰val變量奇怪的行爲與火花

問題是,當我得到的所有的lazy val類變量列入黑名單的名字時,namesBlackList顯示爲空東陽我所有的Person已經「完全訪問」標籤,至極是不正確的

然而,當我寫namesBlackList裏面filterAccess然後一切工作完好。

ConfigManager.scala

object ConfigManager extends Serializable { 

    private var configMap = Map.empty[String, String] 

    def loadConfig(configPath:String) = { 
    // Reads a key/value properties file and writes it in the configmap 
    } 

    def getParameter(parameter: String): String = configMap.getOrElse(parameter, s"${parameter}=>UNKNOWN") 
} 

AnalyseData.scala

object AnalyseData extends Serializable { 

    private lazy val namesBlackList = ConfigManager.getParameter("names.blacklist").toSet 

    def filterAccess(rdd:RDD[Person]) : RDD[Person] = { 
     rdd.map {person => 
      if (namesBlackList.contains(person.firstName)) 
      (person.firstName,person.lastName,"limited access") 
      else 
      (person.firstName,person.lastName,"full Access") 
     } 
    } 
} 

AnalyseService.scala

object AnalyseService extends Serializable { 
    def main(path:String) { 
     ConfigManager.loadConfig(path) 

     val datas = createNameRdd // reads from a db and create a RDD[Person] 

     val filteredData = AnalyseData.filterAccess(datas) 

    } 
} 

我試圖調整我的代碼中的所有內容,看起來,由於Spark以lazy的方式執行map方法,因此將Singleton對象的結果設置爲lazy val類變量將不會產生正確的結果。 我不明白爲什麼它不工作,更重要的是,我真的找不到如何解決這個問題,除了在方法內調用namesBlackList

感謝您的意見。

+0

似乎就像ConfigManager在'namesBlackList'值初始化時還沒有準備好。你知道它什麼時候被初始化了嗎?你能否展示'excludeNames'方法的主體,即它的工作原理? –

+0

我在寫我的文章時犯了一個錯誤,excludeNames實際上是filterAccess,請參閱我編輯的文章。 – Will

回答

3

請參閱https://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka瞭解所需的一些術語和概念。你的情況會發生什麼事(我想):

  1. ConfigManager.loadConfig(path)司機節點上運行。在那裏初始化configMap

  2. filterAccess,namesBlackList是一個真正的方法調用。因此,當map中的代碼在工作節點上執行時,此調用發生在那裏,並在同一節點上訪問configMap,該節點爲空。

  3. 然而,當你「寫namesBlackList內filterAccess」,那麼它是一個局部變量,它確實成爲關閉工作的一部分,而且是序列化。

要解決此問題,您需要爲configMap使用broadcast variable。像

object ConfigManager extends Serializable { 

    private var configMap: Broadcast[Map[String, String]] = _ 

    def loadConfig(configPath:String) = { 
    // Reads a key/value properties file and writes it in the configmap 
    } 

    def getParameter(parameter: String): String = configMap.value.getOrElse(parameter, s"${parameter}=>UNKNOWN") 
} 

更好的東西,以避免var

def main(path:String) { 
    val configMap = ConfigManager.loadConfig(path) 

    val datas = createNameRdd(configMap) // reads from a db and create a RDD[Person] 

    val filteredData = AnalyseData.filterAccess(datas, configMap) 
} 
+0

這解釋了爲什麼我的單元測試工作,而不是當我在雙節點集羣中運行它時程序本身。謝謝 ! – Will

2

也許你可以嘗試迫使lazy valfilterAccess方法內(但封閉外),像這樣:

object AnalyseData extends Serializable { 

    private lazy val namesBlackList = ConfigManager.getParameter("names.blacklist").toSet 

    def filterAccess(rdd:RDD[Person]) : RDD[Person] = { 
     val localNamesBlackList = namesBlackList  //force the lazy val... 
     rdd.map {person => 
     if (localNamesBlackList.contains(person.firstName)) 
      (person.firstName,person.lastName,"limited access") 
     else 
      (person.firstName,person.lastName,"full Access") 
    } 
    } 
}