2017-02-24 37 views
0

我具有被以下面的方式火花Scala的拋出「java.util.NoSuchElementException:鍵未找到」與廣播可變

// Function 
def loadMovieNames(sparkContext: SparkContext): Map[Int, String] = { 

    // Handle character encoding issues: 
    implicit val codec = Codec("UTF-8") 
    codec.onMalformedInput(CodingErrorAction.REPLACE) 
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE) 

    // Create a Map of Ints to Strings, and populate it from u.item. 
    var movieNames: Map[Int, String] = Map() 

    val lines = sparkContext.textFile("s3a://bucket/movies.dat") 
    for (line <- lines) { 
     var fields = line.split("::") 
     if (fields.length > 1) { 
     movieNames += (fields(0).toInt -> fields(1)) 
     } 
    } 

    return movieNames 
} 

// Main 
val nameDict = loadMovieNames(spark.sparkContext) 
val broadcastNames = spark.sparkContext.broadcast(nameDict) 

下面構造的廣播變量是用於訪問該廣播在主代碼變量。

val resultDF = recommendationsDF.sort($"score".desc).limit(30) 

val check = (id1: Int, id2: Int) => if (id1 == movie) broadcastNames.value(id2) else broadcastNames.value(id1) 

val getName = udf(check) 

val results = resultDF.withColumn("movie", getName($"movieId1", $"movieId2")) 

results.show(30) 

但是,當我嘗試在稍後在main中的廣播變量中進行查找時,出現以下異常。

Caused by: java.util.NoSuchElementException: key not found: 1196 
at scala.collection.MapLike$class.default(MapLike.scala:228) 
at scala.collection.AbstractMap.default(Map.scala:59) 
at scala.collection.MapLike$class.apply(MapLike.scala:141) 
at scala.collection.AbstractMap.apply(Map.scala:59) 
at com.spark.movierec.MovieRecDF$$anonfun$5.apply(MovieRecDF.scala:144) 
at com.spark.movierec.MovieRecDF$$anonfun$5.apply(MovieRecDF.scala:143) 

當我最初遇到相同的問題時,我將Map轉換爲廣播變量。在閱讀了這個問題的答案here後,我意識到這可能是關閉問題。但我仍然不確定如何解決這個問題。創建本地地圖

+0

您可能需要考慮使用Map的get方法而不是apply方法。前者產生一個選項[V],在你的情況下,選項[字符串],然後你可以用邏輯方式處理。適用的問題是,如果您不提供默認值,它將引發異常。 – Phasmid

回答

1

一種方法是使用collectAsMap

val nameDict = sparkContext.broadcast(sparkContext 
    .textFile(path) 
    .map(_.split("::")) 
    .filter(_.size > 1) 
    .map(arr => (arr(0).toInt, arr(1))) 
    .collectAsMap()) 

你也應該考慮使用DataFrames和廣播到位UDF的連接和廣播變量,但應用程序的邏輯是不是清楚。