2016-08-25 31 views
0

比方說,我有兩個DataFrames:針對Spark Dataframes的subtractByKey?

headers = ["id", "info"] 

a = sc.parallelize([(1, "info1"), (2, "info2"), (3, "info3")]).toDF(headers) 
b = sc.parallelize([(2, "info2new")]).toDF(headers) 

我想從a獲得的文件,但在出現在ba覆蓋那些行。因此,期望的輸出是

+---+--------+ 
| id| info| 
+---+--------+ 
| 1| info1| 
| 2|info2new| 
| 3| info3| 
+---+--------+ 

現在,我可以我DataFrames轉換爲RDDS,並使用subtractByKey,但是這迫使我要轉換爲RDDS,地圖(鍵,值)元組,然後再轉換回DataFrames。咩。

我環顧四周,然後看到saw the functions except and subtract for Spark DataFrames,但這些要求的行是精確重複的,而我並不是。

的方式我正在考慮這樣做,現在是像這樣:

a1 = a.select(*(col(x).alias(x + '_old') for x in a.columns)) 
b1 = b.select(*(col(x).alias(x + '_new') for x in b.columns)) 
x = a1.join(b1, a1['id_old'] == b1['id_new"], "outer") 

那我就註冊x作爲不是Temptable,寫一個SQL查詢,除非新的數據不爲空,將返回舊數據。但我認爲這不是特別乾淨!

任何好主意?

+0

或者只是執行左外連接用'coalesce' /'情況when'。 – zero323

回答

0

不知道最好的方法,但你可以用和一些清潔東西left outerjoin

斯卡拉:

// UDF to choose info value 
val newInfo = udf[String,String,String]((infoA,infoB) => { 
    if (infoB == null) 
    infoA 
    else 
    infoB 
}) 

// join -> add col("newInfo") -> drop info cols -> rename "newInfo" 
a.join(b, Seq("id"), "left_outer") 
    .withColumn("newInfo", newInfo(a("info"), b("info"))) 
    .drop(a("info")) 
    .drop(b("info")) 
    .withColumnRenamed("newInfo", "info") 
    .show() 
相關問題