2017-08-30 44 views
0

我要實現以下pyspark滯後功能(基於列)

lag(column1,datediff(column2,column3)).over(window)

偏移是動態的。我也嘗試過使用UDF,但它不起作用。

關於如何實現上述的任何想法?

+0

檢查在這裏的答案:https://stackoverflow.com/questions/36725353/applying-a-window-function-to-calculate-differences-in-pyspark否則把更多的細節問題和數據集 – MedAli

回答

2

lag函數的參數count取整數不是列對象:

psf.lag(col, count=1, default=None) 

因此,它不能成爲一個「動態」的值。 相反,您可以在列中構建滯後,然後將其與自身結合。

首先,讓我們來創建數據幀:

df = spark.createDataFrame(
    sc.parallelize(
     [[1, "2011-01-01"], [1, "2012-01-01"], [2, "2013-01-01"], [1, "2014-01-01"]] 
    ), 
    ["int", "date"] 
) 

我們要列舉行:

from pyspark.sql import Window 
import pyspark.sql.functions as psf 
df = df.withColumn(
    "id", 
    psf.monotonically_increasing_id() 
) 
w = Window.orderBy("id") 
df = df.withColumn("rn", psf.row_number().over(w)) 
    +---+----------+-----------+---+ 
    |int|  date|   id| rn| 
    +---+----------+-----------+---+ 
    | 1|2011-01-01|17179869184| 1| 
    | 1|2012-01-01|42949672960| 2| 
    | 2|2013-01-01|68719476736| 3| 
    | 1|2014-01-01|94489280512| 4| 
    +---+----------+-----------+---+ 

我們建立滯後:

df1 = df.select(
    "int", 
    df.date.alias("date1"), 
    (df.rn - df.int).alias("rn") 
) 
df2 = df.select(
    df.date.alias("date2"), 
    'rn' 
) 

最後,我們可以加入他們的行列計算日期差異:

df1.join(df2, "rn", "inner").withColumn(
    "date_diff", 
    psf.datediff("date1", "date2") 
).drop("rn") 

    +---+----------+----------+---------+ 
    |int|  date1|  date2|date_diff| 
    +---+----------+----------+---------+ 
    | 1|2012-01-01|2011-01-01|  365| 
    | 2|2013-01-01|2011-01-01|  731| 
    | 1|2014-01-01|2013-01-01|  365| 
    +---+----------+----------+---------+