2016-11-30 35 views
1

我試過搜索了一下,但是在Spark SQL中只能找到add_month函數,所以最後在這裏打開了一個新的線程。將不勝感激任何人可以提供的幫助。在Spark中添加12小時到datetime列

我想在使用sqlContext的Spark SQL的日期列中添加小時12,24和48。我使用的1.6.1版本星火,我需要這樣的:

SELECT N1.subject_id, '12-HOUR' AS notes_period, N1.chartdate_start, N2.chartdate, N2.text 
FROM NOTEEVENTS N2, 
(SELECT subject_id, MIN(chartdate) chartdate_start 
    FROM NOTEEVENTS 
    WHERE subject_id = 283 
    AND category != 'Discharge summary' 
GROUP BY subject_id) N1 
WHERE N2.subject_id = N1.subject_id 
and n2.chartdate < n1.chartdate_start + interval '1 hour' * 12 

請注意最後一句話,這是寫在PostgreSQL,而這也正是我需要星火SQL。我非常感謝我能得到的任何幫助。

謝謝。

回答

4

目前有沒有這樣的功能,但你可以寫UDF:

sqlContext.udf.register("add_hours", (datetime : Timestamp, hours : Int) => { 
    new Timestamp(datetime.getTime() + hours * 60 * 60 * 1000) 
}); 

例如:

SELECT N1.subject_id, '12-HOUR' AS notes_period, N1.chartdate_start, N2.chartdate, N2.text 
    FROM NOTEEVENTS N2, 
    (SELECT subject_id, MIN(chartdate) chartdate_start 
     FROM NOTEEVENTS 
     WHERE subject_id = 283 
     AND category != 'Discharge summary' 
    GROUP BY subject_id) N1 
    WHERE N2.subject_id = N1.subject_id 
    and n2.chartdate < add_hours(n1.chartdate_start, 12) 

您還可以使用UNIX_TIMESTAMP函數來計算新的日期。這是在我看來不太可讀,但是可以使用由Anton Okolnychyi其他答案

import org.apache.spark.sql.functions._ 
val addMonths = (datetime : Column, hours : Column) => { 
    from_unixtime(unix_timestamp(n1.chartdate_start) + 12 * 60 * 60) 
} 
+0

@Ahsan大:)如果你將有性能問題,再看看安東Okolnychyi答案 - 本地功能有可能是推下去。但在這種情況下,我認爲不可能推下謂詞,因爲它在少數表格上運行,所以UDF應該沒問題且更易於閱讀。 –

4

有關使用unix_timestamp()功能的日期時間戳轉換成秒,然後加入hours * 60 * 60它是什麼激發了WholeStage代碼將軍代碼?

那麼你的條件將是這樣的:

unix_timestamp(n2.chartdate) < (unix_timestamp(n1.chartdate_start) + 12 * 60 * 60)

+0

我已經嘗試了這種方法,並且是一個完美可行且很好的解決方案。我只是在想,是否有像add_month這樣的東西。謝謝。 – Ahsan

2

相同的PostgreSQL,你可以使用INTERVAL。在SQL

spark.sql("""SELECT current_timestamp() AS now, 
        current_timestamp() + INTERVAL 12 HOURS AS now_plus_twelve""" 
).show(false) 
+-----------------------+-----------------------+ 
|now     |now_plus_twelve  | 
+-----------------------+-----------------------+ 
|2017-12-14 10:49:15.115|2017-12-14 22:49:15.115| 
+-----------------------+-----------------------+ 

隨着Dataset - 斯卡拉:

import org.apache.spark.sql.functions.{current_timestamp, expr} 

spark.range(1) 
    .select(
    current_timestamp as "now", 
    current_timestamp + expr("INTERVAL 12 HOURS") as "now_plus_twelve" 
).show(false) 
+-----------------------+-----------------------+ 
|now     |now_plus_twelve  | 
+-----------------------+-----------------------+ 
|2017-12-14 10:56:59.185|2017-12-14 22:56:59.185| 
+-----------------------+-----------------------+ 

的Python:

from pyspark.sql.functions import current_timestamp, expr 

(spark.range(1).select(
    current_timestamp().alias("now"), 
    (current_timestamp() + expr("INTERVAL 12 HOURS")).alias("now_plus_twelve")))