2017-10-12 114 views
1

我有一個類型爲timestamp的「date」列和long類型的「數量」列的火花數據框。對於每個日期,我都有一些數量的價值。日期按升序排列。但是有些日期不見了。 對於如 - 當前DF -在火花數據框列中填充缺少的日期

Date  | Quantity 
10-09-2016 | 1 
11-09-2016 | 2 
14-09-2016 | 0 
16-09-2016 | 1 
17-09-2016 | 0 
20-09-2016 | 2 

正如你所看到的,DF有一些失蹤日期像2016年12月9日,13-09-2016等我想把0在數量領域那些缺少的日期,使得所得的DF應該看起來像 -

Date  | Quantity 
10-09-2016 | 1 
11-09-2016 | 2 
12-09-2016 | 0 
13-09-2016 | 0 
14-09-2016 | 0 
15-09-2016 | 0 
16-09-2016 | 1 
17-09-2016 | 0 
18-09-2016 | 0 
19-09-2016 | 0 
20-09-2016 | 2 

任何幫助/建議這一點將不勝感激。提前致謝。 請注意,我在scala編碼。

回答

2

我已經寫了這個答案有點詳細的方式,以便於理解代碼。它可以被優化。

所需進口

import java.time.format.DateTimeFormatter 
import java.time.{LocalDate, LocalDateTime} 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.{LongType, TimestampType} 

UDF的字符串到有效的日期格式

val date_transform = udf((date: String) => { 
    val dtFormatter = DateTimeFormatter.ofPattern("d-M-y") 
    val dt = LocalDate.parse(date, dtFormatter) 
    "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth) 
     .replaceAll(" ", "0") 
    }) 

下面UDF代碼Iterate over dates range

def fill_dates = udf((start: String, excludedDiff: Int) => { 
    val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") 
    val fromDt = LocalDateTime.parse(start, dtFormatter) 
    (1 to (excludedDiff - 1)).map(day => { 
     val dt = fromDt.plusDays(day) 
     "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth) 
     .replaceAll(" ", "0") 
    }) 
    }) 
採取

設置樣本數據框(df

val df = Seq(
     ("10-09-2016", 1), 
     ("11-09-2016", 2), 
     ("14-09-2016", 0), 
     ("16-09-2016", 1), 
     ("17-09-2016", 0), 
     ("20-09-2016", 2)).toDF("date", "quantity") 
     .withColumn("date", date_transform($"date").cast(TimestampType)) 
     .withColumn("quantity", $"quantity".cast(LongType)) 

df.printSchema() 
root 
|-- date: timestamp (nullable = true) 
|-- quantity: long (nullable = false) 


df.show()  
+-------------------+--------+ 
|    date|quantity| 
+-------------------+--------+ 
|2016-09-10 00:00:00|  1| 
|2016-09-11 00:00:00|  2| 
|2016-09-14 00:00:00|  0| 
|2016-09-16 00:00:00|  1| 
|2016-09-17 00:00:00|  0| 
|2016-09-20 00:00:00|  2| 
+-------------------+--------+ 

df創建一個臨時數據框(tempDf)到union

val w = Window.orderBy($"date") 
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date")) 
    .filter($"diff" > 1) // Pick date diff more than one day to generate our date 
    .withColumn("next_dates", fill_dates($"date", $"diff")) 
    .withColumn("quantity", lit("0")) 
    .withColumn("date", explode($"next_dates")) 
    .withColumn("date", $"date".cast(TimestampType)) 

tempDf.show(false) 
+-------------------+--------+----+------------------------+ 
|date    |quantity|diff|next_dates    | 
+-------------------+--------+----+------------------------+ 
|2016-09-12 00:00:00|0  |3 |[2016-09-12, 2016-09-13]| 
|2016-09-13 00:00:00|0  |3 |[2016-09-12, 2016-09-13]| 
|2016-09-15 00:00:00|0  |2 |[2016-09-15]   | 
|2016-09-18 00:00:00|0  |3 |[2016-09-18, 2016-09-19]| 
|2016-09-19 00:00:00|0  |3 |[2016-09-18, 2016-09-19]| 
+-------------------+--------+----+------------------------+ 

現在聯盟2個dataframes

val result = df.union(tempDf.select("date", "quantity")) 
    .orderBy("date") 

result.show() 
+-------------------+--------+ 
|    date|quantity| 
+-------------------+--------+ 
|2016-09-10 00:00:00|  1| 
|2016-09-11 00:00:00|  2| 
|2016-09-12 00:00:00|  0| 
|2016-09-13 00:00:00|  0| 
|2016-09-14 00:00:00|  0| 
|2016-09-15 00:00:00|  0| 
|2016-09-16 00:00:00|  1| 
|2016-09-17 00:00:00|  0| 
|2016-09-18 00:00:00|  0| 
|2016-09-19 00:00:00|  0| 
|2016-09-20 00:00:00|  2| 
+-------------------+--------+