2016-12-08 113 views
-1

我想利用已知觀測最後一次正確的填寫火花NaN值 - 見:Spark/Scala: fill nan with last good observation火花地圖分區填寫NaN值

我目前的解決方案中使用的窗函數,以完成任務。但是這並不好,因爲所有的值都映射到一個單獨的分區。 val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }應該會更好。但奇怪的是我的fill函數沒有執行。我的代碼有什麼問題?

+----------+--------------------+ 
|  foo|     bar| 
+----------+--------------------+ 
|2016-01-01|    first| 
|2016-01-02|    second| 
|  null|  noValidFormat| 
|2016-01-04|lastAssumingSameDate| 
+----------+--------------------+ 

以下是完整的示例代碼:

import java.sql.Date 

import org.apache.log4j.{ Level, Logger } 
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SparkSession 

case class FooBar(foo: Date, bar: String) 

object WindowFunctionExample extends App { 

    Logger.getLogger("org").setLevel(Level.WARN) 
val conf: SparkConf = new SparkConf() 
    .setAppName("foo") 
    .setMaster("local[*]") 

    val spark: SparkSession = SparkSession 
    .builder() 
    .config(conf) 
    .enableHiveSupport() 
    .getOrCreate() 

    import spark.implicits._ 

    val myDff = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), 
    ("2016-wrongFormat", "noValidFormat"), 
    ("2016-01-04", "lastAssumingSameDate")) 
    val recordsDF = myDff 
    .toDF("foo", "bar") 
    .withColumn("foo", 'foo.cast("Date")) 
    .as[FooBar] 
    recordsDF.show 

    def notMissing(row: FooBar): Boolean = { 
    row.foo != null 
    } 

    val toCarry = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }.collectAsMap 
    println("###################### carry ") 
    println(toCarry) 
    println(toCarry.foreach(println)) 
    println("###################### carry ") 
    val toCarryBd = spark.sparkContext.broadcast(toCarry) 

    def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = { 
    var lastNotNullRow: FooBar = toCarryBd.value(i).get 
    iter.map(row => { 
     if (!notMissing(row))1 
     FooBar(lastNotNullRow.foo, row.bar) 
     else { 
     lastNotNullRow = row 
     row 
     } 
    }) 
    } 

    // The algorithm does not step into the for loop for filling the null values. Strange 
    val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } 
    val imputedDF = imputed.toDS() 

    println(imputedDF.orderBy($"foo").collect.toList) 
    imputedDF.show 
    spark.stop 
} 

編輯

我固定的代碼被評論所概述。但toCarryBd包含None值。這是怎樣發生像我一樣過濾明確的

def notMissing(row: FooBar): Boolean = {row.foo != null} 
iter.filter(notMissing(_)).toSeq.lastOption 

None值。

(2,None) 
(5,None) 
(4,None) 
(7,Some(FooBar(2016-01-04,lastAssumingSameDate))) 
(1,Some(FooBar(2016-01-01,first))) 
(3,Some(FooBar(2016-01-02,second))) 
(6,None) 
(0,None) 

試圖訪問toCarryBd時,這會導致NoSuchElementException: None.get

回答

2

首先,如果你foo場可以爲空,我會建議創建測試用例類爲:

case class FooBar(foo: Option[Date], bar: String) 

然後,你可以重寫你的notMissing功能是這樣的:

def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined 
+0

你能解釋爲什麼地圖會產生大量的「無」條目? –

+0

@GeorgHeiler如果Seq爲空,則iter.filter(notMissing(_)).toSeq.lastOption'將不返回任何值。 –

+0

謝謝。爲什麼當原始df僅包含4行時,它會執行8次? –