2016-12-19 93 views
1

我需要在Spark中使用接受時間戳,整數和另一個數據框的UDF並返回3個值的元組。在Spark Scala中定義UDF

錯誤發生後,我不斷點擊錯誤,我不確定我是否試圖修復它。

下面是函數:

def determine_price (view_date: org.apache.spark.sql.types.TimestampType , product_id: Int, price_df: org.apache.spark.sql.DataFrame) : (Double, java.sql.Timestamp, Double) = { 
    var price_df_filtered = price_df.filter($"mkt_product_id" === product_id && $"created"<= view_date) 
    var price_df_joined = price_df_filtered.groupBy("mkt_product_id").agg("view_price" -> "min", "created" -> "max").withColumn("last_view_price_change", lit(1)) 
    var price_df_final = price_df_joined.join(price_df_filtered, price_df_joined("max(created)") === price_df_filtered("created")).filter($"last_view_price_change" === 1) 
    var result = (price_df_final.select("view_price").head().getDouble(0), price_df_final.select("created").head().getTimestamp(0), price_df_final.select("min(view_price)").head().getDouble(0)) 
    return result 
} 
val det_price_udf = udf(determine_price) 

它給我的錯誤是:

error: missing argument list for method determine_price 
Unapplied methods are only converted to functions when a function type is expected. 
You can make this conversion explicit by writing `determine_price _` or `determine_price(_,_,_)` instead of `determine_price`. 

如果我開始加入我請其他錯誤,如int預期Int.type發現運行參數或object DataFrame不是包的成員org.apache.spark.sql

給出一些上下文:

這個想法是我有一個價格數據框,一個產品ID和一個創建日期,另一個數據框包含產品ID和查看日期。

我需要確定基於哪個價格是上次創建的價格條目早於查看日期的價格。

由於每個產品ID在第二個數據框中有多個查看日期。我認爲UDF比交叉連接更快。如果有人有不同的想法,我會很感激。

回答

0

由於UDF將在特定分區上的工作人員上運行,因此無法傳遞UDF中的數據框。因爲你不能在Worker上使用RDD(Is it possible to create nested RDDs in Apache Spark?),所以同樣你也不能在Worker上使用DataFrame。

您需要爲此做一些工作!

+0

好吧,我從de UDF參數中刪除了數據框。數據幀被緩存和播放,它應該可以從函數內部訪問我仍然得到錯誤:'錯誤:類型不匹配; found:Int.type required:Int val det_price_udf = udf(determine_price(org.apache.spark.sql.types.TimestampType,Int))' – UrVal

+0

看起來,如果數據框不在UDF中,它不可能是用過的。這不像我習慣於Python的「全局變量」。不知道如何解決這個問題。 – UrVal

+0

你的用例是什麼? –