2016-05-14 51 views
0

專欄中,我想與以前的日期值創建一個新的列ID的組(日期減去當前日期)爲以下數據框使用星火窗函數導致創建數據幀

+---+----------+-----+ 
| id|  date|value| 
+---+----------+-----+ 
| a|2015-04-11| 300| 
| a|2015-04-12| 400| 
| a|2015-04-12| 200| 
| a|2015-04-12| 100| 
| a|2015-04-11| 700| 
| b|2015-04-02| 100| 
| b|2015-04-12| 100| 
| c|2015-04-12| 400| 
+---+----------+-----+ 

我已經嘗試過導入窗口功能。

val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value") 

var w1=Window.partitionBy("id").orderBy("date".desc) 
var leadc1=lead(df1("value"),1).over(w1) 
val df2=df1.withColumn("nvalue",leadc1) 

+---+----------+-----+------+             
| id|  date|value|nvalue| 
+---+----------+-----+------+ 
| a|2015-04-12| 400| 200| 
| a|2015-04-12| 200| 100| 
| a|2015-04-12| 100| 300| 
| a|2015-04-11| 300| 700| 
| a|2015-04-11| 700| null| 
| b|2015-04-12| 100| 100| 
| b|2015-04-02| 100| null| 
| c|2015-04-12| 400| null| 
+---+----------+-----+------+ 

但是,正如我們可以看到當我有ID相同的日期「爲」我收到錯誤result.The結果應該是像

+---+----------+-----+------+             
| id|  date|value|nvalue| 
+---+----------+-----+------+ 
| a|2015-04-12| 400| 300| 
| a|2015-04-12| 200| 300| 
| a|2015-04-12| 100| 300| 
| a|2015-04-11| 300| null| 
| a|2015-04-11| 700| null| 
| b|2015-04-12| 100| 100| 
| b|2015-04-02| 100| null| 
| c|2015-04-12| 400| null| 
+---+----------+-----+------+ 

我已經有使用連接,雖然我是一個解決方案使用窗口函數尋找解決方案。

謝謝

回答

0

問題是你有多行具有相同的日期。 lead將取value從下一個在結果集中,而不是下一個日期。因此,當您按降序對日期進行排序時,下一行可能是同一日期。

如何識別用於特定日期的正確值?例如你爲什麼要從(id = a,date = 2015-04-11)取300,而不是700?

要做到這一點與窗口功能,你可能需要做多次通過 - 這將採取最後nvalue並將其應用於同一個id /日期分組中的所有行 - 但我不知道你的行最初是如何排序的。

val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value") 

var w1 = Window.partitionBy("id").orderBy("date".desc) 
var leadc1 = lead(df1("value"),1).over(w1) 
val df2 = df1.withColumn("nvalue",leadc1) 
val w2 = Window.partitionBy("id", "date").orderBy("??? some way to distinguish row ordering") 
val df3 = df1.withColumn("nvalue2", last_value("nvalue").over(w2))