2016-07-06 51 views
1

我正在嘗試編寫可以在Spark SQL中的Dataframes中工作的UDF。創建與DataFrame和SQL API兼容的UDF

下面是代碼

def Timeformat (timecol1: Int) = { 
    if (timecol1 >= 1440) 
     ("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60)) 
    else 
     ("%02d:%02d".format((timecol1)/60, (timecol1)%60)) 
} 

sqlContext.udf.register("Timeformat", Timeformat _) 

這種方法完全適用的sqlcontext

val dataa = sqlContext.sql("""select Timeformat(abc.time_band) from abc""") 

使用DF - 獲取錯誤 val fcstdataa = abc.select(Timeformat(abc("time_band_start")))

此方法拋出一個類型不匹配錯誤。

<console>:41: error: type mismatch; 
found : org.apache.spark.sql.Column 
required: Int 

當我重新編寫UDF如下,對DF工作完美,但不工作在Sqlcontext。有什麼辦法來解決這個問題,而無需創建多個UDF的做同樣的事情

val Timeformat = udf((timecol1: Int) => 
    if (timecol1 >= 1440) 
     ("%02d:%02d".format((timecol1-1440)/60, (timecol1-1440)%60)) 
    else 
     ("%02d:%02d".format((timecol1)/60, (timecol1)%60)) 
) 

我非常新的Scala和火花,這是什麼兩個聲明之間的區別。一種方法比其他方法更好嗎?

回答

1

在這裏使用UDF並沒有什麼意義,但如果您真的想要這樣做,請不要使用匿名函數。就拿你已經有了(一Int => String)功能,並使用其包裝UDF:

def Timeformat(timecol1: Int): String = ??? 
sqlContext.udf.register("Timeformat", Timeformat _) 
val timeformat_ = udf(Timeformat _) 

另外,您可以callUDF

import org.apache.spark.sql.functions.callUDF 

abc.select(callUDF("Timeformat", $"time_band_start")) 

話雖這麼說非UDF的解決方案應該是首選的大部分時間:

import org.apache.spark.sql.Column 
import org.apache.spark.sql.functions.{when, format_string} 

def timeformatExpr(col: Column) = { 
    val offset = when(col >= 1440, 1440).otherwise(0) 
    val x = ((col - offset)/60).cast("int") 
    val y = (col - offset) % 60 
    format_string("%02d:%02d", x, y) 
} 

其等同於以下SQL:

val expr = """CASE 
    WHEN time_band >= 1440 THEN 
     FORMAT_STRING(
      '%02d:%02d', 
      CAST((time_band - 1440)/60 AS INT), 
      (time_band - 1440) % 60 
    ) 
    ELSE 
     FORMAT_STRING(
      '%02d:%02d', 
      CAST(time_band/60 AS INT), 
      time_band % 60 
    ) 
END""" 

可以在原始SQL中使用以及DataFrame使用selectExprexpr函數。

例子

val df = Seq((1L, 123123), (2L, 42132), (3L, 99)).toDF("id", "time_band") 

df.select($"*", timeformatExpr($"time_band").alias("tf")).show 
// +---+---------+-------+ 
// | id|time_band|  tf| 
// +---+---------+-------+ 
// | 1| 123123|2028:03| 
// | 2| 42132| 678:12| 
// | 3|  99| 01:39| 
// +---+---------+-------+ 

df.registerTempTable("df") 

sqlContext.sql(s"SELECT *, $expr AS tf FROM df").show 
// +---+---------+-------+ 
// | id|time_band|  tf| 
// +---+---------+-------+ 
// | 1| 123123|2028:03| 
// | 2| 42132| 678:12| 
// | 3|  99| 01:39| 
// +---+---------+-------+ 

df.selectExpr("*", s"$expr AS tf").show 
// +---+---------+-------+ 
// | id|time_band|  tf| 
// +---+---------+-------+ 
// | 1| 123123|2028:03| 
// | 2| 42132| 678:12| 
// | 3|  99| 01:39| 
// +---+---------+-------+ 
+0

當然,檢查編輯。 – zero323

+1

非常感謝。 – archerarjun