0
我要實現以下pyspark滯後功能(基於列)
lag(column1,datediff(column2,column3)).over(window)
偏移是動態的。我也嘗試過使用UDF,但它不起作用。
關於如何實現上述的任何想法?
我要實現以下pyspark滯後功能(基於列)
lag(column1,datediff(column2,column3)).over(window)
偏移是動態的。我也嘗試過使用UDF,但它不起作用。
關於如何實現上述的任何想法?
的lag
函數的參數count
取整數不是列對象:
psf.lag(col, count=1, default=None)
因此,它不能成爲一個「動態」的值。 相反,您可以在列中構建滯後,然後將其與自身結合。
首先,讓我們來創建數據幀:
df = spark.createDataFrame(
sc.parallelize(
[[1, "2011-01-01"], [1, "2012-01-01"], [2, "2013-01-01"], [1, "2014-01-01"]]
),
["int", "date"]
)
我們要列舉行:
from pyspark.sql import Window
import pyspark.sql.functions as psf
df = df.withColumn(
"id",
psf.monotonically_increasing_id()
)
w = Window.orderBy("id")
df = df.withColumn("rn", psf.row_number().over(w))
+---+----------+-----------+---+
|int| date| id| rn|
+---+----------+-----------+---+
| 1|2011-01-01|17179869184| 1|
| 1|2012-01-01|42949672960| 2|
| 2|2013-01-01|68719476736| 3|
| 1|2014-01-01|94489280512| 4|
+---+----------+-----------+---+
我們建立滯後:
df1 = df.select(
"int",
df.date.alias("date1"),
(df.rn - df.int).alias("rn")
)
df2 = df.select(
df.date.alias("date2"),
'rn'
)
最後,我們可以加入他們的行列計算日期差異:
df1.join(df2, "rn", "inner").withColumn(
"date_diff",
psf.datediff("date1", "date2")
).drop("rn")
+---+----------+----------+---------+
|int| date1| date2|date_diff|
+---+----------+----------+---------+
| 1|2012-01-01|2011-01-01| 365|
| 2|2013-01-01|2011-01-01| 731|
| 1|2014-01-01|2013-01-01| 365|
+---+----------+----------+---------+
檢查在這裏的答案:https://stackoverflow.com/questions/36725353/applying-a-window-function-to-calculate-differences-in-pyspark否則把更多的細節問題和數據集 – MedAli