2016-10-11 40 views
3

我有3個不同的進程生成3dataframes。每個數據幀都有相同名稱的列。 我的數據框看起來像這樣執行加入多個DataFrame中的火花

id val1 val2  val3 val4 
1 null null  null null 
2 A2  A21  A31  A41 

id val1  val2  val3  val4 
1 B1  B21  B31  B41 
2 null  null  null  null 

id val1  val2  val3 val4 
1 C1  C2  C3  C4 
2 C11  C12  C13  C14 

在這些3個dataframes的,我想創建兩個dataframes,(最後和合並)。 對於最後,順序首的 - (!VAL1 = NULL) 數據幀1>數據幀2>數據幀3

如果結果是有在數據幀1,我將存儲在數據幀最終該行。

我的最終結果應該是:

id finalVal1 finalVal2 finalVal3 finalVal4 
1  B1   B21   B31   B41 
2  A2   A21   A31   A41 

綜合數據幀將存儲結果,從所有3

我如何能做到有效?

+0

莫非您請使用上述數據框提供預期輸出? – cheseaux

+0

添加最終輸出要求 – Neha

+0

是否正確,您沒有「空」或全行?當行爲空時,數據集中的id是否爲? – Wilmerton

回答

5

如果我理解正確的話,每一行要找出第一個非空值,首先尋找到的第一個表,然後第二張桌子,然後是第三張桌子。

你只需要加入基礎上,id這三張表,然後使用​​3210函數來獲得的第一個非空元素

import org.apache.spark.sql.functions._ 

val df1 = sc.parallelize(Seq(
    (1,null,null,null,null), 
    (2,"A2","A21","A31", "A41")) 
).toDF("id", "val1", "val2", "val3", "val4") 

val df2 = sc.parallelize(Seq(
    (1,"B1","B21","B31", "B41"), 
    (2,null,null,null,null)) 
).toDF("id", "val1", "val2", "val3", "val4") 

val df3 = sc.parallelize(Seq(
    (1,"C1","C2","C3","C4"), 
    (2,"C11","C12","C13", "C14")) 
).toDF("id", "val1", "val2", "val3", "val4") 

val consolidated = df1.join(df2, "id").join(df3, "id").select(
    df1("id"), 
    coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"), 
    coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"), 
    coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"), 
    coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4") 
) 

,讓你期望的輸出

+---+----+----+----+----+ 
| id|val1|val2|val3|val4| 
+---+----+----+----+----+ 
| 1| B1| B21| B31| B41| 
| 2| A2| A21| A31| A41| 
+---+----+----+----+----+ 
+0

。如果所有3中的值都爲空,我希望得到空值? – Neha

+0

是的,'coalesce'返回第一個非空值,或者如果一切都爲null,則返回null – cheseaux

-2

如果它們來自三個不同的表格,我將使用下推過濾器在服務器上過濾它們,並使用數據框架連接函數之間的連接將它們連接在一起。

如果它們不是來自數據庫表;您可以使用濾波器並將高階函數映射到相同的並行。

0

編輯:部分空行的新解決方案。它避免了連接,而是使用窗口功能和獨特的...

case class a(id:Int,val1:String,val2:String,val3:String,val4:String) 

val df1 = sc.parallelize(List(
a(1,null,null,null,null), 
a(2,"A2","A21","A31","A41"), 
a(3,null,null,null,null))).toDF() 

val df2 = sc.parallelize(List(
a(1,"B1",null,"B31","B41"), 
a(2,null,null,null,null), 
a(3,null,null,null,null))).toDF() 

val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"), 
a(2,"C11","C12","C13","C14"), 
a(3,"C11","C12","C13","C14"))).toDF() 

val anyNotNull = df1.columns.tail.map(c => col(c).isNotNull).reduce(_ || _) 

val consolidated = { 
    df1 
    .filter(anyNotNull) 
    .withColumn("foo",lit(1)) 
    .unionAll(df2.filter(anyNotNull).withColumn("foo",lit(2))) 
    .unionAll(df3.filter(anyNotNull).withColumn("foo",lit(3))) 
} 

scala> finalDF.show() 
+---+----+----+----+----+ 
| id|val1|val2|val3|val4|                                                              
+---+----+----+----+----+ 
| 1| B1|null| B31| B41| 
| 1| B1| C2| B31| B41| 
| 3| C11| C12| C13| C14| 
| 2| A2| A21| A31| A41| 
| 2| A2| A21| A31| A41| 
+---+----+----+----+----+ 

val w = Window.partitionBy('id).orderBy('foo) 

val coalesced = col("id") +: df1.columns.tail.map(c => first(col(c),true).over(w).as(c)) 
val finalDF = consolidated.select(coalesced:_*).na.drop.distinct 

scala> finalDF.show() 
+---+----+----+----+----+ 
| id|val1|val2|val3|val4| 
+---+----+----+----+----+ 
| 1| B1| C2| B31| B41| 
| 3| C11| C12| C13| C14| 
| 2| A2| A21| A31| A41| 
+---+----+----+----+----+ 

舊的解決方案:

如果只有的null整行或沒有空可言,你可以這樣做(編輯:優於其他解決方案是,你避免不同)

數據:

case class a(id:Int,val1:String,val2:String,val3:String,val4:String) 

val df1 = sc.parallelize(List(
a(1,null,null,null,null), 
a(2,"A2","A21","A31","A41"), 
a(3,null,null,null,null))).toDF() 
val df2 = sc.parallelize(List(
a(1,"B1","B21","B31","B41"), 
a(2,null,null,null,null), 
a(3,null,null,null,null))).toDF() 
val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"), 
a(2,"C11","C12","C13","C14"), 
a(3,"C11","C12","C13","C14"))).toDF() 

鞏固:

val consolidated = { 
    df1.na.drop.withColumn("foo",lit(1)) 
    .unionAll(df2.na.drop.withColumn("foo",lit(2))) 
    .unionAll(df3.na.drop.withColumn("foo",lit(3))) 
} 

scala> consolidated.show() 
+---+----+----+----+----+---+ 
| id|val1|val2|val3|val4|foo| 
+---+----+----+----+----+---+ 
| 2| A2| A21| A31| A41| 1| 
| 1| B1| B21| B31| B41| 2| 
| 1| C1| C2| C3| C4| 3| 
| 2| C11| C12| C13| C14| 3| 
| 3| C11| C12| C13| C14| 3| 
+---+----+----+----+----+---+ 

最終

val w = Window.partitionBy('id).orderBy('foo) 
val finalDF = consolidated 
    .withColumn("foo2",rank().over(w)) 
    .filter('foo2===1) 
    .drop("foo").drop("foo2") 

scala> finalDF.show() 
+---+----+----+----+----+ 
| id|val1|val2|val3|val4| 
+---+----+----+----+----+ 
| 1| B1| B21| B31| B41| 
| 3| C11| C12| C13| C14| 
| 2| A2| A21| A31| A41| 
+---+----+----+----+----+ 
+0

您的解決方案過於複雜,OP在其示例中沒有第三行,這很混亂 – cheseaux

+0

我沒有看到提高測試示例覆蓋率的問題。 OP要求提供高性能解決方案,而不是簡單的解決方案。 – Wilmerton

+0

這很混亂。您認爲使用窗口函數,distinct,filter,drop,map/reduce,case類等比使用coalesce的聯接更有效嗎? – cheseaux