2016-01-12 37 views
1

我有一個包含兩個字符串列的數據框,其中包含有關日期(即「2014-01-01」)的信息。我想對類似日期格式這樣的列進行操作,並減去日期。我試圖用我發現在互聯網定義UDF,例如以下:SparkSQL(Spark 1.3)用於日期操作的UDF

import org.apache.spark.sql.types.DateType 
import org.apache.spark.sql.functions._ 
import org.joda.time.DateTime 
import org.joda.time.format.DateTimeFormat 

val d = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ssZ") 
val dtFunc: (String => Date) = (arg1: String) => DateTime.parse(arg1, d).toDate 
val dtFunc2 = udf(dtFunc) 
val x = df.withColumn("dt", dtFunc2(col("dt_string"))) 

但是,當我使用,我得到了以下錯誤:

scala.MatchError: java.util.Date (of class scala.reflect.internal.Types$TypeRef$$anon$6) 
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:112) 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) 
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:107) 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) 
    at org.apache.spark.sql.functions$.udf(functions.scala:402) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51) 
    at $iwC$$iwC$$iwC.<init>(<console>:53) 
    at $iwC$$iwC.<init>(<console>:55) 
    at $iwC.<init>(<console>:57) 
    at <init>(<console>:59) 
    at .<init>(<console>:63) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

你能幫助我這個請?謝謝!

回答

1

SparkSQL分別代表使用java.sql.Timestampjava.sql.Date的時間戳和日期。 java.util.Date在這裏不起作用。您可以簡單地提取毫秒並傳遞給構造函數java.sql.Date

實際上我會考慮使用HiveContext和Hive UDF。例如,你可以使用unix_timestamp與指定模式(使用Simple Date Format)以字符串轉換爲

df.selectExpr("*", 
    """unix_timestamp(dt_string, "yyyy-MM-dd'T'kk:mm:ss")""")timestamp)""") 

,並使用標準型鑄造獲得日期或時間戳。