這是我用來使用少量列計算值的Spark UDF。將可空列作爲參數傳遞給Spark SQL UDF
def spark_udf_func(s: String, i:Int): Boolean = {
// I'm returning true regardless of the parameters passed to it.
true
}
val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _)
val df = sc.parallelize(Array[(Option[String], Option[Int])](
(Some("Rafferty"), Some(31)),
(null, Some(33)),
(Some("Heisenberg"), Some(33)),
(Some("Williams"), null)
)).toDF("LastName", "DepartmentID")
df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show()
+----------+------------+-----+
| LastName|DepartmentID|valid|
+----------+------------+-----+
| Rafferty| 31| true|
| null| 33| true|
|Heisenberg| 33| true|
| Williams| null| null|
+----------+------------+-----+
任何人都可以解釋爲什麼有效列的值爲null爲最後一行?
當我檢查火花計劃時,我能夠計算出該計劃有一個案例情況,它表示如果column2(DepartmentID)爲null,它必須返回null。
==實際規劃==
*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700]
+- Scan ExternalRDDScan[obj#698]
爲什麼我們在星火這樣的行爲?
爲什麼只有Integer列?
這是什麼,我在這裏做錯了什麼,當UDF參數爲空時處理空值的正確方法是什麼?
也看到https://stackoverflow.com/questions/42791912/how-to-deal-with-spark-udf-input-output-of-primitive-nullable-type –