2016-06-10 27 views
0

我在spark數據框中有最終記錄(在連接和過濾之後)。我需要比較連續行的(按鍵分區)列值並基於條件需要更改e_date列值例如:如何在Spark-scala中實現LEAD和LAG

sample table 
    key1 key 2 col1 col2 s_date  e_date 
    a  1  cv1  cv2 2014   2099 
    a  1  cv3  cv2 2016   2099 
    b  2  cv5  cv6 2016   2099 
    b  2  cv5  cv6 2016   2099 

    final table should look like 
    key1 key 2 col1 col2 s_date  e_date 
    a  1  cv1  cv2 2014   2015 (next records s_date-1) 
    a  1  cv3  cv2 2016   2099 
    b  2  cv5  cv6 2016   2099 
  1. 上述表具有複合密鑰,以便KEY1和KEY2是鍵

  2. 通過鍵比較在分區col1和COL2值

  3. 如果任何列具有與新記錄的s_date -1(在最後的表線1,2)

  4. 如果沒有變化,則忽略(在最後的表線3)新的記錄

任何新值結束舊記錄指針斯卡拉火花

回答

4

超前和滯後已經實施:

import org.apache.spark.sql.functions.{lead, lag} 
import org.apache.spark.sql.expressions.Window 

lag('s_date, 1).over(Window.partitionBy('key1, 'key2).orderBy('s_date)) 

檢查Introducing Window Functions in Spark SQL瞭解詳情。

+0

此解決方案僅適用於您的窗口規範中的每個分區足夠小以適合一個執行程序節點。 –