2016-08-10 202 views
1

利用窗口函數來過濾數據,我有以下數據:如何火花

rowid uid time code 
    1 1  5 a 
    2 1  6 b 
    3 1  7 c 
    4 2  8 a 
    5 2  9 c 
    6 2  9 c 
    7 2  10 c 
    8 2  11 a 
    9 2  12 c 

現在我想篩選這樣的數據,我可以刪除這些行6,7爲特定UID我想在代碼僅保存一行與值「C」

所以預期的數據應該是:

rowid uid time code 
    1 1  5 a 
    2 1  6 b 
    3 1  7 c 
    4 2  8 a 
    5 2  9 c 
    8 2  11 a 
    9 2  12 c 

我使用的窗函數是這樣的:

val window = Window.partitionBy("uid").orderBy("time") 
val change = ((lag("code", 1).over(window) <=> "c")).cast("int") 

這將幫助我們識別代碼爲'c'的每一行。我可以延長這個過濾掉行,以獲得預期的數據

+0

你能澄清你的要求嗎?對於每個UID,您是否只想爲每個代碼保留一行,或者僅針對代碼「c」,您需要執行此操作? – mattinbits

回答

1

如果你只想刪除行其代碼=「C」(除了第一個每個UID),你可以嘗試以下方法:

val window = Window.partitionBy("uid", "code").orderBy("time") 
val result = df 
    .withColumn("rank", row_number().over(window)) 
    .where(
    (col("code") !== "c") || 
    col("rank") === 1 
) 
    .drop("rank") 

編輯基於新的信息:

val window = Window.partitionBy("uid").orderBy("time") 
val result = df 
    .withColumn("lagValue", coalesce(lag(col("code"), 1).over(window), lit(""))) 
    .where(
    (col("code") !== "c") || 
    (col("lagValue") !== "c") 
) 
    .drop("lagValue") 
+1

在使用上面的代碼時,當我做partitionBy(「uid」,「code」)時,得到的數據集不正確,因爲這會得出以下結果: rowid uid時間碼 – hbabbar

+0

@hbabbar,它爲什麼不正確? –

+0

道歉,錯過上傳整個註釋.. 所以得到的DF是一樣的東西: 使用上面的代碼,當我這樣做 VAL窗口= Window.partitionBy(「UID」,「代碼」)排序依據( 「時間」) df.withColumn( 「等級」,ROW_NUMBER()在(窗)) 所得數據集是不正確,因爲這給出以下結果: rowid的UID時間碼秩 4 2 8 a 2 2 1 6 b 1 3 1 7 c 1 5 2 9 c 1 因此,我在第e grouping on uid – hbabbar