0
比方說,我有兩個DataFrames:針對Spark Dataframes的subtractByKey?
headers = ["id", "info"]
a = sc.parallelize([(1, "info1"), (2, "info2"), (3, "info3")]).toDF(headers)
b = sc.parallelize([(2, "info2new")]).toDF(headers)
我想從a
獲得的文件,但在出現在b
a
覆蓋那些行。因此,期望的輸出是
+---+--------+
| id| info|
+---+--------+
| 1| info1|
| 2|info2new|
| 3| info3|
+---+--------+
現在,我可以我DataFrames轉換爲RDDS,並使用subtractByKey
,但是這迫使我要轉換爲RDDS,地圖(鍵,值)元組,然後再轉換回DataFrames。咩。
我環顧四周,然後看到saw the functions except
and subtract
for Spark DataFrames,但這些要求的行是精確重複的,而我並不是。
的方式我正在考慮這樣做,現在是像這樣:
a1 = a.select(*(col(x).alias(x + '_old') for x in a.columns))
b1 = b.select(*(col(x).alias(x + '_new') for x in b.columns))
x = a1.join(b1, a1['id_old'] == b1['id_new"], "outer")
那我就註冊x作爲不是Temptable,寫一個SQL查詢,除非新的數據不爲空,將返回舊數據。但我認爲這不是特別乾淨!
任何好主意?
或者只是執行左外連接用'coalesce' /'情況when'。 – zero323