編輯:部分空行的新解決方案。它避免了連接,而是使用窗口功能和獨特的...
case class a(id:Int,val1:String,val2:String,val3:String,val4:String)
val df1 = sc.parallelize(List(
a(1,null,null,null,null),
a(2,"A2","A21","A31","A41"),
a(3,null,null,null,null))).toDF()
val df2 = sc.parallelize(List(
a(1,"B1",null,"B31","B41"),
a(2,null,null,null,null),
a(3,null,null,null,null))).toDF()
val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"),
a(2,"C11","C12","C13","C14"),
a(3,"C11","C12","C13","C14"))).toDF()
val anyNotNull = df1.columns.tail.map(c => col(c).isNotNull).reduce(_ || _)
val consolidated = {
df1
.filter(anyNotNull)
.withColumn("foo",lit(1))
.unionAll(df2.filter(anyNotNull).withColumn("foo",lit(2)))
.unionAll(df3.filter(anyNotNull).withColumn("foo",lit(3)))
}
scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1|null| B31| B41|
| 1| B1| C2| B31| B41|
| 3| C11| C12| C13| C14|
| 2| A2| A21| A31| A41|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
val w = Window.partitionBy('id).orderBy('foo)
val coalesced = col("id") +: df1.columns.tail.map(c => first(col(c),true).over(w).as(c))
val finalDF = consolidated.select(coalesced:_*).na.drop.distinct
scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1| C2| B31| B41|
| 3| C11| C12| C13| C14|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
舊的解決方案:
如果只有的null
整行或沒有空可言,你可以這樣做(編輯:優於其他解決方案是,你避免不同)
數據:
case class a(id:Int,val1:String,val2:String,val3:String,val4:String)
val df1 = sc.parallelize(List(
a(1,null,null,null,null),
a(2,"A2","A21","A31","A41"),
a(3,null,null,null,null))).toDF()
val df2 = sc.parallelize(List(
a(1,"B1","B21","B31","B41"),
a(2,null,null,null,null),
a(3,null,null,null,null))).toDF()
val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"),
a(2,"C11","C12","C13","C14"),
a(3,"C11","C12","C13","C14"))).toDF()
鞏固:
val consolidated = {
df1.na.drop.withColumn("foo",lit(1))
.unionAll(df2.na.drop.withColumn("foo",lit(2)))
.unionAll(df3.na.drop.withColumn("foo",lit(3)))
}
scala> consolidated.show()
+---+----+----+----+----+---+
| id|val1|val2|val3|val4|foo|
+---+----+----+----+----+---+
| 2| A2| A21| A31| A41| 1|
| 1| B1| B21| B31| B41| 2|
| 1| C1| C2| C3| C4| 3|
| 2| C11| C12| C13| C14| 3|
| 3| C11| C12| C13| C14| 3|
+---+----+----+----+----+---+
最終
val w = Window.partitionBy('id).orderBy('foo)
val finalDF = consolidated
.withColumn("foo2",rank().over(w))
.filter('foo2===1)
.drop("foo").drop("foo2")
scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1| B21| B31| B41|
| 3| C11| C12| C13| C14|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
莫非您請使用上述數據框提供預期輸出? – cheseaux
添加最終輸出要求 – Neha
是否正確,您沒有「空」或全行?當行爲空時,數據集中的id是否爲? – Wilmerton