2016-09-27 73 views
1

我使用sparkSql 1.6.2(Java API的),我必須處理以下數據框中具有的價值在2列的列表:星火 - Java的UDF返回多個列

ID AttributeName AttributeValue 
0 [an1,an2,an3] [av1,av2,av3] 
1 [bn1,bn2]  [bv1,bv2] 

所需的表是:

ID AttributeName AttributeValue 
0 an1   av1 
0 an2   av2 
0 an3   av3 
1 bn1   bv1 
1 bn2   bv2 

我想我必須使用爆炸功能和自定義UDF功能的組合。

我發現以下資源:

,我可以成功運行,上面寫着兩列的例子,在返回前兩個字符串的連接列

UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() { 
     public String call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
      return col1.apply(0) + col2.apply(0); 
     } 
    }; 

context.udf().register("combineUDF", combineUDF, DataTypes.StringType); 

t他的問題是編寫UDF的簽名,返回兩列(用Java)。 據我瞭解,我必須定義一個新的StructType如下圖所示,設置爲返回類型之一,但到目前爲止,我沒能有最終的代碼工作

StructType retSchema = new StructType(new StructField[]{ 
      new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()), 
      new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()), 
     } 
    ); 

context.udf() .register(「combineUDF」,combineUDF,retSchema);

任何幫助將非常感激。

更新:我想首先實現拉鍊(爲AttributeName,的AttributeValue)所以後來我將只需要應用標準sparkSql爆炸功能:

ID AttName_AttValue 
0 [[an1,av1],[an1,av2],[an3,av3]] 
1 [[bn1,bv1],[bn2,bv2]] 

我建立了下列UDF:

UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() { 
     public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
      List<List<String>> zipped = new LinkedList<>(); 

      for (int i = 0, listSize = col1.size(); i < listSize; i++) { 
       List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i)); 
       zipped.add(subRow); 
      } 

      return zipped; 
     } 

    }; 

但是當我運行的代碼

myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10); 

我得到了以下錯誤消息:

scala.MatchError:[[AN1,AV1],AN1,AV2],AN3,AV3] [[](的類java.util.LinkedList)

看起來組合已經正確執行,但是返回類型並不是Scala中的預期類型。

任何幫助?

回答

0

最後,我設法得到了我正在尋找的結果,但可能不是以最有效的方式。

基本上是2步驟:

  • 兩個列表
  • 行爆炸列表的

對於第一步的郵編予定義的以下UDF功能

UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() { 
    public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
     ArrayList zipped = new ArrayList(); 

     for (int i = 0, listSize = col1.size(); i < listSize; i++) { 
      String subRow = col1.apply(i) + ";" + col2.apply(i); 
      zipped.add(subRow); 
     } 

     return scala.collection.JavaConversions.asScalaBuffer(zipped); 
    } 

}; 

然後我用下面的代碼調用它:

DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue")); 

在這個階段,DF2看起來像這樣:

ID AttName_AttValue 
0 [[an1,av1],[an1,av2],[an3,av3]] 
1 [[bn1,bv1],[bn2,bv2]] 

然後我叫下面的lambda函數,用於引爆列表爲行:

DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row")); 

在這個階段,DF3樣子即:

ID AttName_AttValue 
0 [an1,av1] 
0 [an1,av2] 
0 [an3,av3] 
1 [bn1,bv1] 
1 [bn2,bv2] 

最後要拆分attrib UTE名稱和值分爲兩個不同的列,我轉換數據框成JavaRDD以便使用地圖功能:

JavaRDD df3RDD = df3.toJavaRDD().map(
      (Function<Row, Row>) myRow -> { 
       String[] info = String.valueOf(myRow.get(1)).split(","); 
       return RowFactory.create(myRow.get(0), info[0], info[1]); 
     }).cache(); 

如果有人有更好的解決方案隨意評論。 我希望它有幫助。