Window
功能應該與你首先需要幫助一些結果。 Filter
將滿足你的第三個需要。您可以通過從日期時間值中提取time
來滿足您的第三個需求。
給出一個數據幀作爲
+-----+-------------------+
|state|timestamp |
+-----+-------------------+
|0 |Sun Aug 13 10:58:44|
|1 |Sun Aug 13 11:59:44|
|1 |Sun Aug 13 12:50:43|
|1 |Sun Aug 13 13:00:44|
|0 |Sun Aug 13 13:58:42|
|0 |Sun Aug 13 14:00:41|
|0 |Sun Aug 13 14:30:45|
|0 |Sun Aug 13 14:58:46|
|1 |Sun Aug 13 15:00:47|
|0 |Sun Aug 13 16:00:49|
+-----+-------------------+
做我上面解釋應該幫助的事情。做下面的事情應該可以解決你的第一和第二個需求。
import org.apache.spark.sql.functions._
df.withColumn("temp", lag("state", 1).over(Window.orderBy("timestamp")))
.withColumn("temp", when(col("temp").isNull, lit(0)).otherwise(col("temp")))
.filter(col("state") =!= col("temp"))
你應該有
+-----+-------------------+----+
|state|timestamp |temp|
+-----+-------------------+----+
|1 |Sun Aug 13 11:59:44|0 |
|0 |Sun Aug 13 13:58:42|1 |
|1 |Sun Aug 13 15:00:47|0 |
|0 |Sun Aug 13 16:00:49|1 |
+-----+-------------------+----+
現在關於你的第三需求,你需要找到方法來提取timestamp
列time
,並完成類似下面
import org.apache.spark.sql.functions._
df.withColumn("temp", lag("state", 1).over(Window.orderBy("timestamp")))
.withColumn("temp", when(col("temp").isNull, lit(0)).otherwise(col("temp")))
.filter(col("state") =!= col("temp"))
.select(collect_list(col("timestamp")).as("time"))
.withColumn("time", concat_ws(" + ", concat_ws(" - ", $"time"(1), $"time"(0)), concat_ws(" - ", $"time"(3), $"time"(2))))
你應該有
+-------------------------------------------------------------------------------------+
|time |
+-------------------------------------------------------------------------------------+
|Sun Aug 13 13:58:42 - Sun Aug 13 11:59:44 + Sun Aug 13 16:00:49 - Sun Aug 13 15:00:47|
+-------------------------------------------------------------------------------------+
我希望答案是有幫助的,除了與timestamp
列time
值的提取
3×「我需要」 0 X「我已經試過」 – Axalix
@indra:你有什麼到目前爲止已經試過......什麼失敗了 –