2017-08-25 80 views
0

當我調用一個函數時,它就起作用了。但是當我在UDF中調用該函數將無法工作。UDF中的Spark classnotfoundexception

這是完整的代碼。

val sparkConf = new SparkConf().setAppName("HiveFromSpark").set("spark.driver.allowMultipleContexts","true") 
val sc = new SparkContext(sparkConf) 
val hive = new org.apache.spark.sql.hive.HiveContext(sc) 

///////////// UDFS 
def toDoubleArrayFun(vec:Any) : scala.Array[Double] = { 
    return vec.asInstanceOf[WrappedArray[Double]].toArray 
} 
def toDoubleArray=udf((vec:Any) => toDoubleArrayFun(vec)) 

//////////// PROCESS 
var df = hive.sql("select vec from mst_wordvector_tapi_128dim where word='soccer'") 
println("==== test get value then transform") 
println(df.head().get(0)) 
println(toDoubleArrayFun(df.head().get(0))) 

println("==== test transform by udf") 
df.withColumn("word_v", toDoubleArray(col("vec"))) 
.show(10); 

然後這個輸出。

sc: org.apache.spark.SparkContext = [email protected] 
hive: org.apache.spark.sql.hive.HiveContext = 
toDoubleArrayFun: (vec: Any)Array[Double] 
toDoubleArray: org.apache.spark.sql.UserDefinedFunction 
df: org.apache.spark.sql.DataFrame = [vec: array<double>] 
==== test get value then transform 
WrappedArray(-0.88675,, 0.0216657) 
[[email protected] 
==== test transform by udf 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, xdad008.band.nhnsystem.com): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$$5ba2a895f25683dd48fe725fd825a71$$$$$$iwC$$anonfun$toDoubleArray$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 

此處輸出完整。 https://gist.github.com/jeesim2/efb52f12d6cd4c1b255fd0c917411370

正如你所看到的「toDoubleArrayFun」函數運行良好,但在udf它聲明ClassNotFoundException。

我無法更改配置單元數據結構,並且需要將vec轉換爲Array [Double]才能生成Vector實例。

那麼上面的代碼有什麼問題?

星火版本是1.6.1

更新1

蜂巢表的 'VEC' 列類型爲 「array<double>

下面的代碼也會導致錯誤

var df = hive.sql("select vec from mst_wordvector_tapi_128dim where 
word='hh'") 
df.printSchema() 
var word_vec = df.head().get(0) 
println(word_vec) 
println(Vectors.dense(word_vec)) 

輸出

df: org.apache.spark.sql.DataFrame = [vec: array<double>] 
root 
|-- vec: array (nullable = true) 
| |-- element: double (containsNull = true) 
==== test get value then transform 
word_vec: Any = WrappedArray(-0.88675,...7) 
<console>:288: error: overloaded method value dense with alternatives: 
(values: Array[Double])org.apache.spark.mllib.linalg.Vector <and> 
(firstValue: Double,otherValues:Double*)org.apache.spark.mllib.linalg.Vector 
cannot be applied to (Any) 
println(Vectors.dense(word_vec)) 

這意味着配置單元'array<double>'列不能被鑄造到Array<Double> 其實我想計算距離:雙與兩個array<double>列。 如何添加基於array<double>列的矢量列?

典型方法是

Vectors.sqrt(Vectors.dense(Array<Double>, Array<Double>) 

回答

2

由於udf函數去序列化和反序列化過程,any數據類型將無法正常工作。您將必須定義傳遞給udf函數的列的確切數據類型。

從你的問題的輸出似乎你有你的數據幀只有一列,即vec這是Array[Double]類型的

df: org.apache.spark.sql.DataFrame = [vec: array<double>] 

其實沒有必要的是UDF功能爲您的vec列已經是的Array數據類型,這就是你的udf函數所做的,也就是將值轉換爲Array[Double]

現在,你的其他函數調用工作

println(toDoubleArrayFun(df.head().get(0))) 

,因爲沒有必要序列化和反序列化過程中,它只是斯卡拉函數調用。

+0

啊,可序列化的重點!但是,我如何使數據幀列'Array ',而不是任何?我更新了問題! –

+0

而不是你的udf函數中的'Any',只需將數據類型定義爲'WrappedArray [Double]',你應該沒問題。 :) –

+0

謝謝你的答案..順便說一句,當我設置參數類型爲'WrappedArray [雙]'而不是'任何'它失敗。 ':346:錯誤:類型不匹配;發現:任何。需要:scala.collection.mutable.WrappedArray [Double] println(Vectors.dense(toDoubleArrayFun(df.head()。get(0))))' –