我有一個DataFrame
我想要動態創建多個UDF來確定某些行是否匹配。我現在只是測試一個例子。我的測試代碼如下所示。如何在Spark中動態創建UDF?
//create the dataframe
import spark.implicits._
val df = Seq(("t","t"), ("t", "f"), ("f", "t"), ("f", "f")).toDF("n1", "n2")
//create the scala function
def filter(v1: Seq[Any], v2: Seq[String]): Int = {
for (i <- 0 until v1.length) {
if (!v1(i).equals(v2(i))) {
return 0
}
}
return 1
}
//create the udf
import org.apache.spark.sql.functions.udf
val fudf = udf(filter(_: Seq[Any], _: Seq[String]))
//apply the UDF
df.withColumn("filter1", fudf(Seq($"n1"), Seq("t"))).show()
但是,當我運行最後一行時,出現以下錯誤。
:30: error: not found: value df df.withColumn("filter1", fudf($"n1", Seq("t"))).show() ^ :30: error: type mismatch; found : Seq[String] required: org.apache.spark.sql.Column df.withColumn("filter1", fudf($"n1", Seq("t"))).show() ^
關於我在做什麼的錯誤?請注意,我在Scala v2.11.x和Spark 2.0.x上。
另一方面,如果我們可以解決這個「動態」UDF問題/關注點,我的用例就是將它們添加到數據框中。用下面的一些測試代碼,它需要永遠(它甚至沒有完成,我不得不按CTRL-C分解)。我猜測在for循環中做一堆.withColumn
在Spark中是一個糟糕的主意。如果是這樣,請讓我知道,我會完全放棄這種方法。
import spark.implicits._
val df = Seq(("t","t"), ("t", "f"), ("f", "t"), ("f", "f")).toDF("n1", "n2")
import org.apache.spark.sql.functions.udf
val fudf = udf((x: String) => if (x.equals("t")) 1 else 0)
var df2 = df
for (i <- 0 until 10000) {
df2 = df2.withColumn("filter"+i, fudf($"n1"))
}
不知道你是想在這裏完成的,在你的列項都是字符串不是序列 –
基本上,我試圖創建一個UDF是需要列的任意數字(例如'$」 (例如「t」,「f」,...,「t」)和UDF內部的值,看看是否'n1 =「t」和n2 =「f」... n10 =「t」'。 –
這種類型的「過濾」可以很容易地用'df.where(「n1 ='t'和n2 ='f'...和n10 ='t')count'串行完成,但這種方法是不可並行,這樣的過濾器在每次發生Spark動作時都必須按順序運行。 –