2017-10-05 49 views
0

我需要更新delta數據的數據框的row_number列。爲Delta數據更新Spark Dataframe的窗口函數row_number列

我已經實現如下的基本負載的ROW_NUMBER:

輸入數據:

val base = List(List("001", "a", "abc"), List("001", "a", "123"),List("003", "c", "456") ,List("002", "b", "dfr"), List("003", "c", "ytr")) 
    .map(row => (row(0), row(1), row(2))) 

    val DS1 = base.toDF("KEY1", "KEY2" ,"VAL") 

    DS1.show() 
+----+----+---+ 
|KEY1|KEY2|VAL| 
+----+----+---+ 
| 001| a|abc| 
| 001| a|123| 
| 003| c|456| 
| 002| b|dfr| 
| 003| c|ytr| 
+----+----+---+ 

現在我已經添加使用如下面的窗口函數的ROW_NUMBER:

val baseDF = DS1.select(col("KEY1"), col("KEY2"), col("VAL") ,row_number().over(Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("KEY1"), col("KEY2").asc)).alias("Row_Num")) 

baseDF.show() 

|KEY1|KEY2|VAL|Row_Num| 
+----+----+---+-------+ 
|001 |a |abc|1  | 
|001 |a |123|2  | 
|002 |b |dfr|1  | 
|003 |c |456|1  | 
|003 |c |ytr|2  | 
+----+----+---+-------+ 

現在三角洲負荷降至:

val delta = List(List("001", "a", "y45") ,List("002", "b", "444")) 
    .map(row => (row(0), row(1), row(2))) 

val DS2 = delta.toDF("KEY1", "KEY2" ,"VAL") 

DS2.show() 

+----+----+---+ 
|KEY1|KEY2|VAL| 
+----+----+---+ 
| 001| a|y45| 
| 002| b|444| 
+----+----+---+ 

所以預計更新的結果應該是

baseDF.show() 

|KEY1|KEY2|VAL|Row_Num| 
+----+----+---+-------+ 
|001 |a |abc|1  | 
|001 |a |123|2  | 
| 001| a|y45|3  | -----> Delta record 
|002 |b |dfr|1  | 
| 002| b|444|2  | -----> Delta record 
|003 |c |456|1  | 
|003 |c |ytr|2  | 
+----+----+---+-------+ 

任何建議,以實現使用dataframes /數據集,這個解決方案? 我們可以通過spark rdd的zipWithIndex實現上述解決方案嗎?與更新的行號碼加增量

回答

2

的一種方式是:1)與大量添加列Row_NumDS2,2)聯合baseDF有了它,和3)計算新的行號,如下所示:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 

val combinedDF = baseDF.union(
    DS2.withColumn("Row_Num", lit(Long.MaxValue)) 
) 

val resultDF = combinedDF.select(
    col("KEY1"), col("KEY2"), col("VAL"), row_number().over(
    Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("Row_Num")) 
).alias("New_Row_Num") 
) 

resultDF.show 
+----+----+---+-----------+ 
|KEY1|KEY2|VAL|New_Row_Num| 
+----+----+---+-----------+ 
| 003| c|456|   1| 
| 003| c|ytr|   2| 
| 002| b|dfr|   1| 
| 002| b|444|   2| 
| 001| a|abc|   1| 
| 001| a|123|   2| 
| 001| a|y45|   3| 
+----+----+---+-----------+