2017-07-24 21 views
0

我有以下數據框顯示購買收入。Spark - 窗口與遞歸? - 有條件地傳播行的值

+-------+--------+-------+ 
|user_id|visit_id|revenue| 
+-------+--------+-------+ 
|  1|  1|  0| 
|  1|  2|  0| 
|  1|  3|  0| 
|  1|  4| 100| 
|  1|  5|  0| 
|  1|  6|  0| 
|  1|  7| 200| 
|  1|  8|  0| 
|  1|  9|  10| 
+-------+--------+-------+ 

最終,我希望新列purch_revenue顯示購買在每一行產生的收入。 作爲一種解決方法,我還嘗試引入購買標識purch_id,每次購買時都會增加。所以這只是作爲參考。

+-------+--------+-------+-------------+--------+ 
|user_id|visit_id|revenue|purch_revenue|purch_id| 
+-------+--------+-------+-------------+--------+ 
|  1|  1|  0|   100|  1| 
|  1|  2|  0|   100|  1| 
|  1|  3|  0|   100|  1| 
|  1|  4| 100|   100|  1| 
|  1|  5|  0|   100|  2| 
|  1|  6|  0|   100|  2| 
|  1|  7| 200|   100|  2| 
|  1|  8|  0|   100|  3| 
|  1|  9|  10|   100|  3| 
+-------+--------+-------+-------------+--------+ 

我試圖使用lag/lead功能是這樣的:

user_timeline = Window.partitionBy("user_id").orderBy("visit_id") 
find_rev = fn.when(fn.col("revenue") > 0,fn.col("revenue"))\ 
    .otherwise(fn.lead(fn.col("revenue"), 1).over(user_timeline)) 
df.withColumn("purch_revenue", find_rev) 

這個複製的收入列,如果revenue > 0並且還通過一排拉起來。很明顯,我可以將它連接成一個有限的N,但這不是一個解決方案。

  • 有沒有辦法將這個遞歸應用到revenue > 0
  • 或者,有沒有一種方法來增加基於條件的值?我試圖想出一個辦法來做到這一點,但努力找到一個。

回答

4

窗口函數不支持遞歸,但在這裏不是必需的。這種類型的sesionization可以用累積和易於操作:

from pyspark.sql.functions import col, sum, when, lag 
from pyspark.sql.window import Window 

w = Window.partitionBy("user_id").orderBy("visit_id") 
purch_id = sum(lag(when(
    col("revenue") > 0, 1).otherwise(0), 
    1, 0 
).over(w)).over(w) + 1 

df.withColumn("purch_id", purch_id).show() 
+-------+--------+-------+--------+ 
|user_id|visit_id|revenue|purch_id| 
+-------+--------+-------+--------+ 
|  1|  1|  0|  1| 
|  1|  2|  0|  1| 
|  1|  3|  0|  1| 
|  1|  4| 100|  1| 
|  1|  5|  0|  2| 
|  1|  6|  0|  2| 
|  1|  7| 200|  2| 
|  1|  8|  0|  3| 
|  1|  9|  10|  3| 
+-------+--------+-------+--------+ 
+1

我只是最後得出了同樣的結論。謝謝! – Hans