我使用sparkSql 1.6.2(Java API的),我必須處理以下數據框中具有的價值在2列的列表:星火 - Java的UDF返回多個列
ID AttributeName AttributeValue
0 [an1,an2,an3] [av1,av2,av3]
1 [bn1,bn2] [bv1,bv2]
所需的表是:
ID AttributeName AttributeValue
0 an1 av1
0 an2 av2
0 an3 av3
1 bn1 bv1
1 bn2 bv2
我想我必須使用爆炸功能和自定義UDF功能的組合。
我發現以下資源:
- Explode (transpose?) multiple columns in Spark SQL table
- How do I call a UDF on a Spark DataFrame using JAVA?
,我可以成功運行,上面寫着兩列的例子,在返回前兩個字符串的連接列
UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
return col1.apply(0) + col2.apply(0);
}
};
context.udf().register("combineUDF", combineUDF, DataTypes.StringType);
t他的問題是編寫UDF的簽名,返回兩列(用Java)。 據我瞭解,我必須定義一個新的StructType如下圖所示,設置爲返回類型之一,但到目前爲止,我沒能有最終的代碼工作
StructType retSchema = new StructType(new StructField[]{
new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
}
);
context.udf() .register(「combineUDF」,combineUDF,retSchema);
任何幫助將非常感激。
更新:我想首先實現拉鍊(爲AttributeName,的AttributeValue)所以後來我將只需要應用標準sparkSql爆炸功能:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
我建立了下列UDF:
UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
List<List<String>> zipped = new LinkedList<>();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
zipped.add(subRow);
}
return zipped;
}
};
但是當我運行的代碼
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);
我得到了以下錯誤消息:
scala.MatchError:[[AN1,AV1],AN1,AV2],AN3,AV3] [[](的類java.util.LinkedList)
看起來組合已經正確執行,但是返回類型並不是Scala中的預期類型。
任何幫助?