2017-07-30 107 views
1

如何創建UDFouter join 說,例如,我具有低於柱上用類型如下用戶定義函數用於加入

ColA: String 
ColB: Seq[Row] 

DF1:

ColA ColB 
1 [(1,2),(1,3)] 
2 [(2,3),(3,4)] 

DF2:

ColA ColB 
1 [(1,2),(1,4)] 
3 [(2,5),(3,4)] 

結果:

ColA newCol 
    1 [(1,2),(1,3)] 
    2 [(2,3),(3,4)] 
    3 [(2,5),(3,4)] 

代碼例如:

val joinDf=DF1.join(DF2,DF1(ColA)===DF2(ColA),"outer") 
    .withColumn("newCol", when(DF1(ColB).isNull,DF2(ColB)) 
     .otherwise(when(DF2(ColB).isNull,DF1(ColB)).otherwise(DF1(ColB)))) 
    .select(col("colA"),col("newCol")) 

val joinUdf=udf((a: Seq[Row],b: Seq[Row]) => (a,b) match { 
    case (null,b) => a 
    case (a,null) => b 
    case (a,b) => b 
} 

這引發錯誤。

Java.lang.UnsupportedOperationException ::類型架構org.apache.spark.sql.Row不支持

+0

什麼是你得到的錯誤?請更新那個 –

+0

你能否更新你的DF1模式? –

+0

seq [行]在數據框中不可行。它可能在RDD中。 –

回答

1

鑑於第一數據框DF1schema

root 
|-- ColA: integer (nullable = false) 
|-- ColB: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- _1: integer (nullable = false) 
| | |-- _2: integer (nullable = false) 

您必須將您的DF2重命名爲

+----+--------------+ 
|ColA|ColC   | 
+----+--------------+ 
|1 |[[1,2], [1,3]]| 
|3 |[[2,5], [3,4]]| 
+----+--------------+ 

schema

root 
|-- ColA: integer (nullable = false) 
|-- ColC: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- _1: integer (nullable = false) 
| | |-- _2: integer (nullable = false) 

,並使用下面的代碼,你甚至不需要一個udf功能內置when功能用於

val joinDf=DF1.join(DF2, Seq("ColA"),"outer") 
    .withColumn("newCol", when(DF1("ColB").isNull, col("ColC")) 
    .otherwise(col("ColB"))) 
    .select(col("colA"),col("newCol")) 

你應該得到您想要的輸出as

+----+--------------+ 
|colA|newCol  | 
+----+--------------+ 
|1 |[[1,2], [1,3]]| 
|3 |[[2,5], [3,4]]| 
|2 |[[2,3], [3,4]]| 
+----+--------------+ 

root 
|-- colA: integer (nullable = true) 
|-- newCol: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- _1: integer (nullable = false) 
| | |-- _2: integer (nullable = false) 
+0

不是這個答案有幫助嗎? :)如果它幫助你,請接受和upvote –