0

我想將一個變量而不是一列傳遞給火花中的UDF。非列參數的火花udf

地圖是以下格式的 Spark dataframe to nested map

val joinUDF = udf((replacementLookup: Map[String, Double], newValue: String) => { 
    replacementLookup.get(newValue) match { 
     case Some(tt) => tt 
     case None => 0.0 
    } 
    }) 

(columnsMap).foldLeft(df) { 
    (currentDF, colName) => 
     { 
     println(colName._1) 
     println(colName._2) 
     currentDF 
      .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1))) 
     } 
    } 

映射,但是罰球

type mismatch; 
[error] found : Map 
[error] required: org.apache.spark.sql.Column 
[error]   .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1))) 

回答

2

如果你想文字傳遞給UDF,使用org.apache.spark.sql.functions.lit

即使用joinUDF(lit(colName._2), col(colName._1))

但地圖不支持,所以你必須重寫代碼,例如通過創建UDF之前應用地圖參數的

val joinFunction = (replacementLookup: Map[String, Double], newValue: String) => { 
    replacementLookup.get(newValue) match { 
    case Some(tt) => tt 
    case None => 0.0 
    } 
} 

(columnsMap).foldLeft(df) { 
    (currentDF, colName) => 
    { 
    val joinUDF = udf(joinFunction(colName._2, _:String)) 
    currentDF 
     .withColumn("myColumn_" + colName._1, joinUDF(col(colName._1))) 
    } 
} 
+0

但收益率不支持的文字類型類scala.collection.immutable.Map –

+0

好的,這意味着你不能將地圖傳遞給UDF :)所以你必須重構你的代碼,例如你目前正在處理這個問題,關閉 –

+0

。謝謝。文字很棒。 –

2

你可以用柯里:

import org.apache.spark.sql.functions._ 
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn") 

def joinUDF(replacementLookup: Map[String, Double]) = udf((newValue: String) => { 
    replacementLookup.get(newValue) match { 
    case Some(tt) => tt 
    case None => 0.0 
    } 
}) 

val myMap = Map("a" -> 1.5, "b" -> 3.0) 

df.select(joinUDF(myMap)($"StringColumn")).show() 

此外,您還可以用廣播可變嘗試:

import org.apache.spark.sql.functions._ 
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn") 

val myMap = Map("a" -> 1.5, "b" -> 3.0) 
val broadcastedMap = sc.broadcast(myMap) 

def joinUDF = udf((newValue: String) => { 
    broadcastedMap.value.get(newValue) match { 
    case Some(tt) => tt 
    case None => 0.0 
    } 
}) 

df.select(joinUDF($"StringColumn")).show()