2017-09-05 42 views
4

這是我用來使用少量列計算值的Spark UDF。將可空列作爲參數傳遞給Spark SQL UDF

def spark_udf_func(s: String, i:Int): Boolean = { 
    // I'm returning true regardless of the parameters passed to it. 
    true 
} 

val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _) 

val df = sc.parallelize(Array[(Option[String], Option[Int])](
    (Some("Rafferty"), Some(31)), 
    (null, Some(33)), 
    (Some("Heisenberg"), Some(33)), 
    (Some("Williams"), null) 
)).toDF("LastName", "DepartmentID") 

df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show() 

+----------+------------+-----+ 
| LastName|DepartmentID|valid| 
+----------+------------+-----+ 
| Rafferty|   31| true| 
|  null|   33| true| 
|Heisenberg|   33| true| 
| Williams|  null| null| 
+----------+------------+-----+ 

任何人都可以解釋爲什麼有效列的值爲null爲最後一行?

當我檢查火花計劃時,我能夠計算出該計劃有一個案例情況,它表示如果column2(DepartmentID)爲null,它必須返回null。

==實際規劃==

*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717] 
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700] 
    +- Scan ExternalRDDScan[obj#698] 

爲什麼我們在星火這樣的行爲?
爲什麼只有Integer列?
這是什麼,我在這裏做錯了什麼,當UDF參數爲空時處理空值的正確方法是什麼?

+0

也看到https://stackoverflow.com/questions/42791912/how-to-deal-with-spark-udf-input-output-of-primitive-nullable-type –

回答

3

問題是null不是斯卡拉詮釋一個有效的值(這是後盾值),而這是一個字符串的有效值。 Int等同於java int原語並且必須有一個值。這意味着當值爲空時udf不能被調用,因此null仍然存在。

有解決這個方法有兩種:

  1. 更改函數接受java.lang.Integer中(這是一個對象,可以爲null)
  2. 如果你不能改變的功能,你可以使用when/otherwise在null的情況下做一些特殊的事情。例如,當(COL( 「整數關口」)。參考isNull,someValue中)。否則(原始呼叫)

這方面的一個很好的解釋可以發現here

+1

還有第三個選項可以讓你堅持Scala Int:將參數打包在一個struct中(使用'df.withColumn(「valid」,spark_udf(struct(df.col(「LastName」),df.col(「DepartmentID」))))')並使用'Row'作爲udf的輸入參數。在udf裏面,你可以使用'row.isNullAt(i:Int)'檢查行的空值 –

0

要接受空,請使用整數(Java數據類型而不是斯卡拉智力)

def spark_udf_func(s: String, i:Integer): Boolean = { 
    // I'm returning true regardless of the parameters passed to it. 
    if(i == null) false else true 
}