2017-08-24 41 views
0

我正在使用UDF函數將字符串應用於火花數據框中的一列,該字符串遍歷words字集,並且發現如果給定列字符串包含任何的自定字(見下文):Spark(scala)dataframes - 返回在給定字符串中找到的字集列表

udf { (s: String) => words.value.exists(word => s.contains(word)) } 

我怎麼會需要那麼它在words一套是返回所有項目的列表,以改變該功能在字符串中找到?

我一直在使用whenotherwise嘗試:

udf { (s: String) => when(words.value.exists(word => s.contains(word)), word).otherwise(null) } 

但我得到一個type mismatch,反正,我認爲這將只返回第一個匹配。我只是在學習scala和spark,所以歡迎任何建議。

回答

2

傳遞給你用這裏應該是一個簡單的斯卡拉功能的udf函數的參數 - 任何使用SQL的功能,如when將返回Column對象,這是沒有這些功能的預期返回類型(他們應該返回在Spark DataFrames中支持作爲數據類型的類型 - 基元,數組,映射,案例類等)。

因此,實現將簡單地是:

udf { (s: String) => words.value.filter(word => s.contains(word)) } 

這產生與輸入類型String和outpur類型Seq[String],這意味着所得的列將是AB Array列UDF。

例如:

val words = sc.broadcast(Seq("aaa", "bbb")) 
val udf1 = udf { (s: String) => words.value.filter(word => s.contains(word)) } 

Seq("aaabbbb", "bbb", "aabb").toDF("word").select(udf1($"word")).show() 
// +----------+ 
// | UDF(word)| 
// +----------+ 
// |[aaa, bbb]| 
// |  [bbb]| 
// |  []| 
// +----------+ 
+0

感謝@Tzach - 這聽起來簡直太容易了是真的。我只是試着運行這個並得到一個錯誤('java.lang.UnsupportedOperationException:Schema for scala.collection.immutable.Set [String] is not supported')。我的'詞彙'是不可變的集合(正在廣播)嗎? – renegademonkey

+1

是的,如果'words'是一個'Set [String]',則UDF的返回類型也變爲'Set [String]' - 這不是受支持的DataFrame類型。要解決這個問題,只需將結果轉換爲Seq:udf {(s:String)=> words.value.filter(word => s.contains(word))。toSeq} –

相關問題