0
我需要更新delta數據的數據框的row_number列。爲Delta數據更新Spark Dataframe的窗口函數row_number列
我已經實現如下的基本負載的ROW_NUMBER:
輸入數據:
val base = List(List("001", "a", "abc"), List("001", "a", "123"),List("003", "c", "456") ,List("002", "b", "dfr"), List("003", "c", "ytr"))
.map(row => (row(0), row(1), row(2)))
val DS1 = base.toDF("KEY1", "KEY2" ,"VAL")
DS1.show()
+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001| a|abc|
| 001| a|123|
| 003| c|456|
| 002| b|dfr|
| 003| c|ytr|
+----+----+---+
現在我已經添加使用如下面的窗口函數的ROW_NUMBER:
val baseDF = DS1.select(col("KEY1"), col("KEY2"), col("VAL") ,row_number().over(Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("KEY1"), col("KEY2").asc)).alias("Row_Num"))
baseDF.show()
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a |abc|1 |
|001 |a |123|2 |
|002 |b |dfr|1 |
|003 |c |456|1 |
|003 |c |ytr|2 |
+----+----+---+-------+
現在三角洲負荷降至:
val delta = List(List("001", "a", "y45") ,List("002", "b", "444"))
.map(row => (row(0), row(1), row(2)))
val DS2 = delta.toDF("KEY1", "KEY2" ,"VAL")
DS2.show()
+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001| a|y45|
| 002| b|444|
+----+----+---+
所以預計更新的結果應該是
baseDF.show()
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a |abc|1 |
|001 |a |123|2 |
| 001| a|y45|3 | -----> Delta record
|002 |b |dfr|1 |
| 002| b|444|2 | -----> Delta record
|003 |c |456|1 |
|003 |c |ytr|2 |
+----+----+---+-------+
任何建議,以實現使用dataframes /數據集,這個解決方案? 我們可以通過spark rdd的zipWithIndex實現上述解決方案嗎?與更新的行號碼加增量