2017-03-31 87 views
0

我使用此代碼創建數據幀:Spark數據集使用空值減少?

val data = List(
    List(444.1235D), 
    List(67.5335D), 
    List(69.5335D), 
    List(677.5335D), 
    List(47.5335D), 
    List(null) 
) 

    val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_)) 
    val schema = StructType(Array(
    StructField("value", DataTypes.DoubleType, true) 
)) 

    val df = sqlContext.createDataFrame(rdd, schema) 

然後我申請我的UDF它:

val multip: Dataset[Double] = df.select(doubleUdf(df("value"))).as[Double] 

,然後我想用這個數據集減輕:

val multipl = multip.reduce(_ * _) 

在這裏,我得到了0.0作爲結果。 此外,我試着用同樣的結果篩選出空

val multipl = multip.filter(_ != null).reduce(_ * _) 

。 如果我從數據中刪除空值,那麼所有工作都應該如此。如何使用空值減少工作量?

我的UDF的定義是這樣的:

val doubleUdf: UserDefinedFunction = udf((v: Any) => Try(v.toString.toDouble).toOption) 
+1

什麼是doubleUdf定義爲。 –

+0

@JustinPihony我已將它添加到問題 – user2975535

回答

5

我會與你doubleUdf功能轉換值雙打很強的假設回答,而不是使用一個選項包裝你正在轉向空到0.0零點。所以,如果你想保持下降零點的邏輯,然後過濾器之前別的:

df.na.drop.select(doubleUdf(df("value"))).as[Double] 
+0

其實我的udf返回選項,因爲這裏推薦[鏈接] http://stackoverflow.com/questions/32357164/sparksql-how-to-deal-with-null-values-in -user-defined-function [/ link]將它加到問題 – user2975535

+0

已增加,但是在這裏,select(doubleUdf(df(「value」)))是完全多餘的。數據已經是'DoubleType'了,如果不是,建議'cast'會更好,你不覺得嗎? – zero323

+0

這僅僅是一個例子。這個函數假設適用於其他數據類型 – user2975535

2

首先,我會問你爲什麼即使null處理的。我會評估我讀取數據的方式,以確保不會發生。

然後我會注意,您可以消除從null你的內存List之前,你甚至可以在RDD水平這樣的例子:

data.flatMap(Option(_)).flatten

但如果你必須在處理null RDD水平,你有選擇的(沒有雙關語意):

sparkContext.parallelize(data).filter(!_.contains(null))

sparkContext.parallelize(data).map(_.flatMap(Option(_))).filter(_.nonEmpty)

我更喜歡後者。我不喜歡在Scala代碼中查看null

由於Spark無法優化UDF,所以我會遠離基於UDF的解決方案,並且遺憾地丟失了Spark的優化功能,而不是像null那樣失敗。