2017-07-26 69 views
0

我有一個類型集的列,我使用spark數據集API的collect_set(),它返回包裝數組的包裝數組。我想從嵌套包裝數組的所有值中獲取單個數組。我怎樣才能做到這一點?WrapedArray的WrappedArray到java數組

例如, Cassandra表:

Col1 
{1,2,3} 
{1,5} 

我正在使用Spark數據集API。
row.get(0)返回包裝數組的包裝數組。

回答

4

考慮你有Dataset<Row> ds其中有value列。

+-----------------------+ 
|value     | 
+-----------------------+ 
|[WrappedArray(1, 2, 3)]| 
+-----------------------+ 

而且它有以下模式

root 
|-- value: array (nullable = true) 
| |-- element: array (containsNull = true) 
| | |-- element: integer (containsNull = false) 

使用UDF

定義UDF1像下面。

static UDF1<WrappedArray<WrappedArray<Integer>>, List<Integer>> getValue = new UDF1<WrappedArray<WrappedArray<Integer>>, List<Integer>>() { 
     public List<Integer> call(WrappedArray<WrappedArray<Integer>> data) throws Exception { 
     List<Integer> intList = new ArrayList<Integer>(); 
     for(int i=0; i<data.size(); i++){ 
      intList.addAll(JavaConversions.seqAsJavaList(data.apply(i))); 
     } 
     return intList; 
    } 
}; 

註冊和呼叫UDF1像下面

import static org.apache.spark.sql.functions.col; 
import static org.apache.spark.sql.functions.callUDF; 
import scala.collection.JavaConversions; 

//register UDF 
spark.udf().register("getValue", getValue, DataTypes.createArrayType(DataTypes.IntegerType)); 

//Call UDF 
Dataset<Row> ds1 = ds.select(col("*"), callUDF("getValue", col("value")).as("udf-value")); 
ds1.show(); 

使用爆炸功能

import static org.apache.spark.sql.functions.col; 
import static org.apache.spark.sql.functions.explode; 

Dataset<Row> ds2 = ds.select(explode(col("value")).as("explode-value")); 
ds2.show(false); 
+0

是的,可以完成,我試過了其他我分解了這些集合,然後使用collect_set()將它們聚合在一起,所以只有一個數組,你告訴我要爆炸collect_set()的結果,在這兩種情況下,我都有一個問題,那就是是否會有這也是我選擇flatten的原因,你也可以指點我一些教程,書籍等等,用於spark + java(而不是scala)+ dataset api – rohanagarwal

+0

我編輯了我的答案,以使用UDF獲取數組。希望這可以幫助。 – abaghel

0

如果你有一個數據框,你可以使用UDF來flattern列表 下面是簡單的例子,對於DF1

import spark.implicits._ 

import org.apache.spark.sql.functions._ 
//create a dummy data 

val df = Seq(
    (1, List(1,2,3)), 
    (1, List (5,7,9)), 
    (2, List(4,5,6)), 
    (2,List(7,8,9)) 
).toDF("id", "list") 

val df1 = df.groupBy("id").agg(collect_set($"list").as("col1")) 

df1.show(false) 

輸出:

+---+----------------------------------------------+ 
|id |col1           | 
+---+----------------------------------------------+ 
|1 |[WrappedArray(1, 2, 3), WrappedArray(5, 7, 9)]| 
|2 |[WrappedArray(7, 8, 9), WrappedArray(4, 5, 6)]| 
+---+----------------------------------------------+ 


val testUDF = udf((list: Seq[Seq[Integer]]) => {list.flatten}) 


df1.withColumn("newCol", testUDF($"col1")).show(false) 

輸出

+---+----------------------------------------------+------------------+ 
|id |col1           |newCol   | 
+---+----------------------------------------------+------------------+ 
|1 |[WrappedArray(1, 2, 3), WrappedArray(5, 7, 9)]|[1, 2, 3, 5, 7, 9]| 
|2 |[WrappedArray(7, 8, 9), WrappedArray(4, 5, 6)]|[7, 8, 9, 4, 5, 6]| 
+---+----------------------------------------------+------------------+ 

我希望這可以幫助!

+0

你可以請張貼相當於Java代碼UDF。我在Seq >上看到了這種扁平化功能,但無法正確使用它。 – rohanagarwal

+0

我希望這可以幫助https://stackoverflow.com/questions/35348058/how-do-i-call-a-udf-on-a-spark-dataframe-using-java –

+0

其實我想實現扁平化,它並不像Java中的list.flatten那麼簡單,可能是因爲Scala更豐富。文件爲flatten是單行,對我來說沒有意義:( – rohanagarwal