2017-08-10 72 views
0

我有一個自定義的udf並在spark中註冊。如果我嘗試訪問該UDF,則會拋出error.Unable訪問。火花中的UDF使用情況

我試過這樣。

spark.udf.register("rssi_weightage", FilterMap.rssi_weightage) 
val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(rssi_weightage($"RSSI").as("RSSI_Weight"))) 

顯示錯誤在第一(rssi_weightage($ 「RSSI」)// rssi_weightage沒有發現錯誤

任何幫助將不勝感激。

+0

我的解決方案能解決您的問題嗎?如果是,請接受答案 –

回答

2

這不是你如何使用UDF,實際UDF是spark.udf.register返回值那麼你可以做:

val udf_rssii_weightage = spark.udf.register("rssi_weightage", FilterMap.rssi_weightage) 

val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(udf_rssi_weightage($"RSSI")).as("RSSI_Weight")) 

但在你的情況,你不需要註冊UDF,只是用org.apache.spark.sql.functions.udf轉換一個單組LAR函數的UDF:

val udf_rssii_weightage = udf(FilterMap.rssi_weightage) 
+0

感謝您的解決方案... –

+0

好一個:) @Raphael,值得讚賞 –

1

我想你有你定義的UDF功能, 下一個快照在公告UDF稍微不同的方法方式的問題 - 它直接定義函數: 進口org.apache.spark.sql.functions._

val data = sqlContext.read.json(sc.parallelize(Seq("{'foo' : 'Bar'}", "{'foo': 'Baz'}"))) 

val example = Seq("Bar", "Bazzz") 
val urbf = udf { foo: String => if (example.contains(example)) 1 else 0 } 

data.select($"foo", urbf($"foo")).show 

+--------+-------------+ 
| foo |UDF(foo)  | 
+--------+-------------+ 
| Bar |   1| 
| Bazzz |   0| 
+--------+-------------+ 
+0

感謝您的解決方案... –