2017-06-19 37 views
-4

我JSON行,看起來像下面集結JSON對象,將字符串轉換時間戳至今

[{"time":"2017-03-23T12:23:05","user":"randomUser","action":"sleeping"}] 
    [{"time":"2017-03-23T12:24:05","user":"randomUser","action":"sleeping"}] 
    [{"time":"2017-03-23T12:33:05","user":"randomUser","action":"sleeping"}] 
    [{"time":"2017-03-23T15:33:05","user":"randomUser2","action":"eating"}] 
    [{"time":"2017-03-23T15:33:06","user":"randomUser2","action":"eating"}] 

,所以我就2的問題,首先所有的時間被存儲爲字符串我DF裏面,我相信它必須是我整理它們的日期?

其次,我需要以5分鐘的時間間隔聚合這些數據,例如,2017-03-23T12:20:00至2017-03-23T12:24:59發生的所有事情都需要彙總並認爲是2017-03-23T12:20:00時間戳

預期輸出是

​​

感謝

回答

0

您可以將StringType列轉換爲使用鑄造TimestampType列;然後,您可以將時間戳投入到IntegerType中,以使「舍入」降至最後5分鐘的間隔,並且通過該組(以及所有其他列):

// importing SparkSession's implicits 
import spark.implicits._ 

// Use casting to convert String into Timestamp: 
val withTime = df.withColumn("time", $"time" cast TimestampType) 

// calculate the "most recent 5-minute-round time" and group by all 
val result = withTime.withColumn("time", $"time" cast IntegerType) 
    .withColumn("time", ($"time" - ($"time" mod 60 * 5)) cast TimestampType) 
    .groupBy("time", "user", "action").count() 

result.show(truncate = false) 
// +---------------------+-----------+--------+-----+ 
// |time     |user  |action |count| 
// +---------------------+-----------+--------+-----+ 
// |2017-03-23 12:20:00.0|randomUser |sleeping|2 | 
// |2017-03-23 15:30:00.0|randomUser2|eating |2 | 
// |2017-03-23 12:30:00.0|randomUser |sleeping|1 | 
// +---------------------+-----------+--------+-----+