2017-06-05 34 views
0

我有以下UDF:無法執行用戶定義的函數

val jac_index:(Array[String],Array[String])=>Float=(Sq1:Array[String],Sq2:Array[String])=> 
{ 
    val Sq3=Sq1.intersect(Sq2) 
    val Sq4=Sq1.union(Sq2).distinct 
    if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F 
} 
val jacUDF=udf(jac_index) 

,當我執行下面的句子

val movie_jac_df=movie_pairs_df.withColumn("jac",jacUDF(movie_pairs_df("name"),movie_pairs_df("name2"))) 

我得到「無法執行用戶定義的函數」

錯誤

的架構movie_pairs_df是以下內容

root 
|-- movie: string (nullable = true) 
|-- name: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- movie2: string (nullable = true) 
|-- name2: array (nullable = true) 
| |-- element: string (containsNull = true) 

那麼原因是什麼?

回答

1

Spark的DataFrame模型Array列爲mutable.WrappedArray,這意味着您的UDF應該將兩個WrappedArrays作爲其輸入;

如果更改jac_index期望兩個這樣的數組:

import scala.collection.mutable 

val jac_index: (mutable.WrappedArray[String], mutable.WrappedArray[String]) => Float = 
    (Sq1, Sq2) => { /* same implementation */ } 

這將正常工作。

+0

它的工作原理,謝謝! – leonfrank

0

定義UDF如下

val jacUDF = udf((Sq1:mutable.WrappedArray[String], Sq2:mutable.WrappedArray[String]) => { 
    val Sq3=Sq1.intersect(Sq2) 
    val Sq4=Sq1.union(Sq2).distinct 
    if (!Sq4.isEmpty) Sq3.length.toFloat/Sq4.length.toFloat else 0F 
})