我已經寫了這個答案有點詳細的方式,以便於理解代碼。它可以被優化。
所需進口
import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, TimestampType}
UDF的字符串到有效的日期格式
val date_transform = udf((date: String) => {
val dtFormatter = DateTimeFormatter.ofPattern("d-M-y")
val dt = LocalDate.parse(date, dtFormatter)
"%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
.replaceAll(" ", "0")
})
下面UDF代碼Iterate over dates range
def fill_dates = udf((start: String, excludedDiff: Int) => {
val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val fromDt = LocalDateTime.parse(start, dtFormatter)
(1 to (excludedDiff - 1)).map(day => {
val dt = fromDt.plusDays(day)
"%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
.replaceAll(" ", "0")
})
})
採取
設置樣本數據框(df
)
val df = Seq(
("10-09-2016", 1),
("11-09-2016", 2),
("14-09-2016", 0),
("16-09-2016", 1),
("17-09-2016", 0),
("20-09-2016", 2)).toDF("date", "quantity")
.withColumn("date", date_transform($"date").cast(TimestampType))
.withColumn("quantity", $"quantity".cast(LongType))
df.printSchema()
root
|-- date: timestamp (nullable = true)
|-- quantity: long (nullable = false)
df.show()
+-------------------+--------+
| date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00| 1|
|2016-09-11 00:00:00| 2|
|2016-09-14 00:00:00| 0|
|2016-09-16 00:00:00| 1|
|2016-09-17 00:00:00| 0|
|2016-09-20 00:00:00| 2|
+-------------------+--------+
與df
創建一個臨時數據框(tempDf
)到union
:
val w = Window.orderBy($"date")
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date"))
.filter($"diff" > 1) // Pick date diff more than one day to generate our date
.withColumn("next_dates", fill_dates($"date", $"diff"))
.withColumn("quantity", lit("0"))
.withColumn("date", explode($"next_dates"))
.withColumn("date", $"date".cast(TimestampType))
tempDf.show(false)
+-------------------+--------+----+------------------------+
|date |quantity|diff|next_dates |
+-------------------+--------+----+------------------------+
|2016-09-12 00:00:00|0 |3 |[2016-09-12, 2016-09-13]|
|2016-09-13 00:00:00|0 |3 |[2016-09-12, 2016-09-13]|
|2016-09-15 00:00:00|0 |2 |[2016-09-15] |
|2016-09-18 00:00:00|0 |3 |[2016-09-18, 2016-09-19]|
|2016-09-19 00:00:00|0 |3 |[2016-09-18, 2016-09-19]|
+-------------------+--------+----+------------------------+
現在聯盟2個dataframes
val result = df.union(tempDf.select("date", "quantity"))
.orderBy("date")
result.show()
+-------------------+--------+
| date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00| 1|
|2016-09-11 00:00:00| 2|
|2016-09-12 00:00:00| 0|
|2016-09-13 00:00:00| 0|
|2016-09-14 00:00:00| 0|
|2016-09-15 00:00:00| 0|
|2016-09-16 00:00:00| 1|
|2016-09-17 00:00:00| 0|
|2016-09-18 00:00:00| 0|
|2016-09-19 00:00:00| 0|
|2016-09-20 00:00:00| 2|
+-------------------+--------+