2016-01-21 43 views
2

鑑於此數據幀DFSparkSQL前,從數據幀後分組

+-----------+--------------------+-------------+-------+ 
|CustNumb |  PurchaseDate|  price| activeFlag| 
+-----------+--------------------+-------------+-------+ 
|   3|2013-07-17 00:00:...|   17.9| 0| 
|   3|2013-08-27 00:00:...|  61.13| 0| 
|   3|2013-08-28 00:00:...|  25.07| 1| 
|   3|2013-08-29 00:00:...|  24.23| 0| 
|   3|2013-09-06 00:00:...|   3.94| 0| 
|   20|2013-02-28 00:00:...|  354.64| 0| 
|   20|2013-04-07 00:00:...|   15.0| 0| 
|   20|2013-05-10 00:00:...|  545.0| 0| 
|   28|2013-02-17 00:00:...|  190.0| 0| 
|   28|2013-04-08 00:00:...|   20.0| 0| 
|   28|2013-04-16 00:00:...|   89.0| 0| 
|   28|2013-05-18 00:00:...|  260.0| 0| 
|   28|2013-06-06 00:00:...|  586.57| 1| 
|   28|2013-06-09 00:00:...|  250.0| 0| 

我希望得到的結果返回的2行平均價格的,它會找出無效之前訂購的購買日期後,當經過讀取行標誌'1'。這裏是我尋找的結果:

+-----------+--------------------+-------------+-------+---------------+ 
|CustNumb |  PurchaseDate|  price| activeFlag| OutputVal | 
+-----------+--------------------+-------------+-------+------------+ 
|   3|2013-07-17 00:00:...|   17.9| 0| 17.9 
|   3|2013-08-27 00:00:...|  61.13| 0| 61.13 
|   3|2013-08-28 00:00:...|  25.07| 1| 26.8 (avg of 2 prices before and 2 after) 
|   3|2013-08-29 00:00:...|  24.23| 0| 24.23 
|   3|2013-09-06 00:00:...|   3.94| 0| 3.94 

|   20|2013-02-28 00:00:...|  354.64| 0| 354.64 
|   20|2013-04-07 00:00:...|   15.0| 0| 15.0 
|   20|2013-05-10 00:00:...|  545.0| 0| 545.0 

|   28|2013-02-17 00:00:...|  190.0| 0| 190.0 
|   28|2013-04-08 00:00:...|   20.0| 0| 20.0 
|   28|2013-04-16 00:00:...|   89.0| 0| 89.0 
|   28|2013-05-18 00:00:...|  260.0| 0| 260.0 
|   28|2013-06-06 00:00:...|  586.57| 1| 199.6 (avg of 2 prices before and 1 after) 
|   28|2013-06-09 00:00:...|  250.0| 0| 250 

在用於custNum 3和28上面的例子中,我有activeFlag 1,所以需要計算的2行之前平均和如果它與相同custNumb存在後..

我想用在數據幀的窗口功能,但無法得到任何好的想法,我爲很新的火花編程

val w = Window.partitionBy("CustNumb").orderBy("PurchaseDate") 

我怎樣才能做到這一點,是可以實現的,解決這個火花通過窗口功能或任何更好的方式來做到這一點?

回答

0

如果你已經有了一個窗口簡單的條件這樣的應該只是罰款:

val cond = ($"activeFlag" === 1) && (lag($"activeFlag", 1).over(w) === 0) 

// Windows covering rows before and after 
val before = w.rowsBetween(-2, -1) 
val after = w.rowsBetween(1, 2) 

// Expression with sum of rows and number of rows 
val sumPrice = sum($"price").over(before) + sum($"price").over(after) 
val countPrice = sum($"ones_").over(before) + sum($"ones_").over(after) 

val expr = when(cond, sumPrice/countPrice).otherwise($"price") 

df.withColumn("ones_", lit(1)).withColumn("outputVal", expr) 
+1

Zero323。感謝您的解決方案,但是您能否解釋查找avgprice的滯後函數?什麼是滯後($「price,1).over(w)返回以及它如何在前後花費2行?我需要驗證相同的custNum是否存在於activeflag行之上和之下的2行,然後得到平均值 – rk1113

+0

其實它不是。我不知道怎麼看錯了你的描述。請檢查更新。 – zero323

+0

感謝您的解決方案,雖然!!我修改了您的舊解決方案,並得到它的工作以及..我貼在下面,請讓我知道如果這需要修復!! – rk1113

0

感謝Zero323。你搖滾! 這是基於你的幫助,我的代碼塊我修改得到什麼,我期待在結果數據:

val windw = Window.partitionBy("CustNumb").orderBy("PurchaseDate") 
val cond = ($"activeFlag" === 1) //&& (lag($"activeFlag", 1).over(win) === 0) 
val avgprice = (lag($"price", 1).over(windw) + lag($"price", 2).over(windw) + lead($"price", 1).over(windw) + lead($"price", 2).over(windw))/4.0 
val expr = when(cond, avgprice).otherwise($"price") 
val finalresult = df.withColumn("newPrice", expr) 

我唯一搞清楚的是,如果activeflag = 1的行存在正上方,然後我想在activeflag = 1的行上方多出一行。我會嘗試更新,如果我找到解決方法來獲取此。

+0

看起來幾乎正確,但它不包括可能會出現一個較短的窗口(例如下面只有一個),我在開始的時候也錯過了,最簡單的方法就是使用'avg($「price」)。over(w.rowsBetween(-2,2) )'但它考慮到當前行。 – zero323

+0

感謝您的建議..它的作品完美! – rk1113