2016-12-22 55 views
1

我有一個DataFrame我想要動態創建多個UDF來確定某些行是否匹配。我現在只是測試一個例子。我的測試代碼如下所示。如何在Spark中動態創建UDF?

//create the dataframe 
import spark.implicits._ 
val df = Seq(("t","t"), ("t", "f"), ("f", "t"), ("f", "f")).toDF("n1", "n2") 

//create the scala function 
def filter(v1: Seq[Any], v2: Seq[String]): Int = { 
    for (i <- 0 until v1.length) { 
    if (!v1(i).equals(v2(i))) { 
     return 0 
    } 
    } 
    return 1 
} 

//create the udf 
import org.apache.spark.sql.functions.udf 
val fudf = udf(filter(_: Seq[Any], _: Seq[String])) 

//apply the UDF 
df.withColumn("filter1", fudf(Seq($"n1"), Seq("t"))).show() 

但是,當我運行最後一行時,出現以下錯誤。

 
:30: error: not found: value df 
     df.withColumn("filter1", fudf($"n1", Seq("t"))).show() 
    ^
:30: error: type mismatch; 
found : Seq[String] 
required: org.apache.spark.sql.Column 
     df.withColumn("filter1", fudf($"n1", Seq("t"))).show() 
              ^

關於我在做什麼的錯誤?請注意,我在Scala v2.11.x和Spark 2.0.x上。

另一方面,如果我們可以解決這個「動態」UDF問題/關注點,我的用例就是將它們添加到數據框中。用下面的一些測試代碼,它需要永遠(它甚至沒有完成,我不得不按CTRL-C分解)。我猜測在for循環中做一堆.withColumn在Spark中是一個糟糕的主意。如果是這樣,請讓我知道,我會完全放棄這種方法。

import spark.implicits._ 
val df = Seq(("t","t"), ("t", "f"), ("f", "t"), ("f", "f")).toDF("n1", "n2") 

import org.apache.spark.sql.functions.udf 
val fudf = udf((x: String) => if (x.equals("t")) 1 else 0) 

var df2 = df 
for (i <- 0 until 10000) { 
    df2 = df2.withColumn("filter"+i, fudf($"n1")) 
} 
+0

不知道你是想在這裏完成的,在你的列項都是字符串不是序列 –

+0

基本上,我試圖創建一個UDF是需要列的任意數字(例如'$」 (例如「t」,「f」,...,「t」)和UDF內部的值,看看是否'n1 =「t」和n2 =「f」... n10 =「t」'。 –

+0

這種類型的「過濾」可以很容易地用'df.where(「n1 ='t'和n2 ='f'...和n10 ='t')count'串行完成,但這種方法是不可並行,這樣的過濾器在每次發生Spark動作時都必須按順序運行。 –

回答

1

包圍「T」lit()

df.withColumn("filter1", fudf($"n1", Seq(lit("t")))).show() 

嘗試在註冊sqlContext UDF。

Spark 2.0 UDF registration

+0

你可以用任意數量的列和lits顯示一個例子嗎? –

+0

'Seq(lit(「t」),lit(「f」))'。如果我們不使用'lit'作爲字符串,它會被視爲coulmn的名字。 – mrsrinivas