2016-09-28 63 views
12

我有一個數據幀,我試圖展平。作爲該過程的一部分,我想分解它,所以如果我有一列數組,數組的每個值將用於創建一個單獨的行。例如,Spark sql如何在不丟失空值的情況下如何爆炸

id | name | likes 
_______________________________ 
1 | Luke | [baseball, soccer] 

應該成爲

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 

這是我的代碼

private DataFrame explodeDataFrame(DataFrame df) { 
    DataFrame resultDf = df; 
    for (StructField field : df.schema().fields()) { 
     if (field.dataType() instanceof ArrayType) { 
      resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name()))); 
      resultDf.show(); 
     } 
    } 
    return resultDf; 
} 

的問題是,在我的數據,一些陣列列值爲空。在這種情況下,整個行被刪除。所以這個數據幀:

id | name | likes 
_______________________________ 
1 | Luke | [baseball, soccer] 
2 | Lucy | null 

成爲

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 

,而不是

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 
2 | Lucy | null 

我怎麼能爆炸我的陣列,這樣我就不會失去空行?

我使用星火1.5.2和Java 8

回答

20

星火2.2+

可以使用explode_outer功能:

import org.apache.spark.sql.functions.explode_outer 

df.withColumn("likes", explode_outer($"likes")).show 

// +---+----+--------+ 
// | id|name| likes| 
// +---+----+--------+ 
// | 1|Luke|baseball| 
// | 1|Luke| soccer| 
// | 2|Lucy| null| 
// +---+----+--------+ 

星火< = 2.1

在斯卡拉,但Java應該是幾乎相同(導入個人功能使用import static)。

import org.apache.spark.sql.functions.{array, col, explode, lit, when} 

val df = Seq(
    (1, "Luke", Some(Array("baseball", "soccer"))), 
    (2, "Lucy", None) 
).toDF("id", "name", "likes") 

df.withColumn("likes", explode(
    when(col("likes").isNotNull, col("likes")) 
    // If null explode an array<string> with a single null 
    .otherwise(array(lit(null).cast("string"))))) 

這裏的想法是基本上具有所需類型的array(NULL)取代NULL。對於複雜型(又名structs),你必須提供完整的模式:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y") 

val st = StructType(Seq(
    StructField("_1", IntegerType, false), StructField("_2", StringType, true) 
)) 

dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y")) 
    .otherwise(array(lit(null).cast(st))))) 

dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y")) 
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>"))))) 

注意

如果陣列Column已經與containsNull集創建false你應該首先改變(用Spark 2.1測試):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true))) 
+0

看起來不錯,謝謝!我有一個後續問題:如果我的列類型是一個StructType?我嘗試使用cast(新的StructType()),但是我得到了'數據類型不匹配:THEN和ELSE表達式應該都是相同的類型或強制到一個常見的類型;'我試圖使我的方法儘可能通用,所以它適合所有列類型。 – alexgbelov

+0

此外,要獲取列類型,我使用DataFrame.dtypes()。有沒有更好的方法來獲取列類型? – alexgbelov

+1

a)您必須提供所有字段的完整模式。 b)'dtypes'或'schema'。 – zero323

0

根據已接受的答案,當數組元素是複雜類型時,可能很難用手來定義它(例如,使用大型結構)。

來自動執行我寫了下面的輔助方法:

相關問題