2016-02-15 48 views
0

我有兩個Spark DataFrames,其中一個有兩個cols,id和Tag。第二個DataFrame有一個id col,但沒有標籤。第一個Dataframe本質上是一個字典,每個id出現一次,而第二個DataFrame和id可能會出現多次。我需要的是在第二個DataFrame中創建一個新的列,其中標記作爲每行(在第二個DataFrame中)的id的函數。我認爲這可以通過首先轉換爲RDD來實現..但是我認爲必須有更優雅的使用DataFrame(Java)的方式。示例:給定a df1行→id:0,標記:「A」,a df2 Row1-> id:0,Tag:null,a df2 Row2-> id:0,Tag:「B」,我需要在生成的DataFrame df3中創建一個等於df1(id = 0)=「A」如果df2標記爲null,但保留原始標記如果不爲空=>導致df3 Row1-> id: 0,標記:「A」df3 Row2-> id:0,標記:「B」。希望這個例子很明顯。如何基於第二個DataFrame(Java)在Spark DataFrame中創建新列?

| ID | No. | Tag | new Tag Col | 
| 1 | 10002 | A |  A  | 
| 2 | 10003 | B |  B  | 
| 1 | 10004 | null |  A  | 
| 2 | 10005 | null |  B  | 
+0

爲什麼一個簡單的'LEFT OUTER JOIN'不會爲你工作? – zero323

+0

我編輯了這個問題,並將查看LOJ .. – Kai

+0

LOJ並沒有完全解決它,但我認爲如果我跟着它與udf()然後我得到我所需要的。謝謝, – Kai

回答

1

所有你需要的是在這裏左外連接和​​3210:

import org.apache.spark.sql.functions.coalesce 

val df = sc.parallelize(Seq(
    (1, 10002, Some("A")), (2, 10003, Some("B")), 
    (1, 10004, None), (2, 10005, None) 
)).toDF("id", "no", "tag") 

val lookup = sc.parallelize(Seq(
    (1, "A"), (2, "B") 
)).toDF("id", "tag") 


df.join(lookup, df.col("id").equalTo(lookup.col("id")), "leftouter") 
    .withColumn("new_tag", coalesce(df.col("tag"), lookup.col("tag"))) 

這應該幾乎相同的Java版本。