2016-03-07 69 views
1

我想在Spark SQL作業中使用DSL而不是純SQL,但是我無法獲得我的UDF工作。udf在Spark SQL中DSL

sqlContext.udf.register("subdate",(dateTime: Long)=>dateTime.toString.dropRight(6)) 

這不起作用

rdd1.toDF.join(rdd2.toDF).where("subdate(rdd1(date_time)) === subdate(rdd2(dateTime))") 

我還想添加其他參加就像這個工作純粹SQL條件

val results=sqlContext.sql("select * from rdd1 join rdd2 on rdd1.id=rdd2.idand subdate(rdd1.date_time)=subdate(rdd2.dateTime)") 

感謝您的幫助

回答

2

SQL您傳遞給where方法的表達式不正確,至少有幾個原因:

  • ===是一個Column方法不是有效的SQL等式。您應該使用單個等號=
  • 括號表示法(table(column))不是引用SQL中的列的有效方法。在這種情況下,它將被識別爲函數調用。 SQL使用點符號(table.column
  • 即使它既不rdd1也不rdd2是有效的表的別名

因爲它看起來像列名是明確的,你可以簡單地使用下面的代碼:

df1.join(df2).where("subdate(date_time) = subdate(dateTime)") 

如果不是這種情況,使用點語法不會首先提供別名。例如參見Usage of spark DataFrame "as" method

此外,註冊UDF通常在您始終使用原始SQL時很有意義。如果你想使用DataFrame API,最好是直接使用UDF:

import org.apache.spark.sql.functions.udf 

val subdate = udf((dateTime: Long) => dateTime.toString.dropRight(6)) 

val df1 = rdd1.toDF 
val df2 = rdd2.toDF 

df1.join(df2, subdate($"date_time") === subdate($"dateTime")) 

,或者如果列名是模棱兩可:

df1.join(df2, subdate(df1("date_time")) === subdate(df2("date_time"))) 

最後,對於簡單的功能這樣最好是組成內置表達式比創建UDF。

+0

非常感謝。通過編寫內置表達式,你的意思是什麼?從sql.Column包中使用「substr」類似的函數? – vgkowski

+0

或多或少。這裏有一些微妙的東西(並不是每個函數都是用表達式來實現的),但是不要糾纏於此。如果這有幫助,請不要感謝 - 只接受和/或upvote :) – zero323