2017-10-06 101 views
0

我有從2017年1月1日開始到2017年1月7日的數據,這是一週需要的每週聚合。我在下面的方式使用窗函數在Spark中使用Windows函數的每週聚合

val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day")) 
     .agg(sum("Value") as "aggregate_sum") 
     .select("window.start", "window.end", "aggregate_sum") 

我有在數據幀的數據作爲

DateTime,value 
    2017-01-01T00:00:00.000+05:30,1.2 
    2017-01-01T00:15:00.000+05:30,1.30 
-- 
    2017-01-07T23:30:00.000+05:30,1.43 
    2017-01-07T23:45:00.000+05:30,1.4 

我得到的輸出爲:

2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87 
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74 

這表明,我一天是從29日開始2016年12月,但實際數據是從2017年1月1日開始,爲什麼這個利潤率發生?

+2

您可以添加關於所使用的數據的一些信息?以及你期待什麼類型的結果。 – Shaido

+0

@Shaido我提供了相同的 –

+0

提供的數據似乎沒有包含代碼示例中使用的「DateTime」列。另外,輸出與你期望的輸出有什麼不同? – Shaido

回答

1

對於像這樣翻滾的窗戶,可以設置開始時間的偏移量,更多信息可以在博客here中找到。滑動窗口被使用,但是,通過將「窗口持續時間」和「滑動持續時間」設置爲相同的值,它將與具有開始偏移量的翻滾窗口相同。

的語法如下一樣,

window(column, window duration, sliding duration, starting offset) 

有了自己的價值觀,我發現了,在64小時的偏移將給出2017-01-01 00:00:00的開始時間。

val data = Seq(("2017-01-01 00:00:00",1.0), 
       ("2017-01-01 00:15:00",2.0), 
       ("2017-01-08 23:30:00",1.43)) 
val df = data.toDF("DateTime","value") 
    .withColumn("DateTime", to_timestamp($"DateTime", "yyyy-MM-dd HH:mm:ss")) 

val df2 = df 
    .groupBy(window(col("DateTime"), "1 week", "1 week", "64 hours")) 
    .agg(sum("value") as "aggregate_sum") 
    .select("window.start", "window.end", "aggregate_sum") 

要把這導致數據幀:

+-------------------+-------------------+-------------+ 
|    start|    end|aggregate_sum| 
+-------------------+-------------------+-------------+ 
|2017-01-01 00:00:00|2017-01-08 00:00:00|   3.0| 
|2017-01-08 00:00:00|2017-01-15 00:00:00|   1.43| 
+-------------------+-------------------+-------------+