-1
我想利用已知觀測最後一次正確的填寫火花NaN值 - 見:Spark/Scala: fill nan with last good observation火花地圖分區填寫NaN值
我目前的解決方案中使用的窗函數,以完成任務。但是這並不好,因爲所有的值都映射到一個單獨的分區。 val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
應該會更好。但奇怪的是我的fill
函數沒有執行。我的代碼有什麼問題?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
以下是完整的示例代碼:
import java.sql.Date
import org.apache.log4j.{ Level, Logger }
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
case class FooBar(foo: Date, bar: String)
object WindowFunctionExample extends App {
Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
.setAppName("foo")
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val myDff = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
("2016-wrongFormat", "noValidFormat"),
("2016-01-04", "lastAssumingSameDate"))
val recordsDF = myDff
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
recordsDF.show
def notMissing(row: FooBar): Boolean = {
row.foo != null
}
val toCarry = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }.collectAsMap
println("###################### carry ")
println(toCarry)
println(toCarry.foreach(println))
println("###################### carry ")
val toCarryBd = spark.sparkContext.broadcast(toCarry)
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
var lastNotNullRow: FooBar = toCarryBd.value(i).get
iter.map(row => {
if (!notMissing(row))1
FooBar(lastNotNullRow.foo, row.bar)
else {
lastNotNullRow = row
row
}
})
}
// The algorithm does not step into the for loop for filling the null values. Strange
val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
val imputedDF = imputed.toDS()
println(imputedDF.orderBy($"foo").collect.toList)
imputedDF.show
spark.stop
}
編輯
我固定的代碼被評論所概述。但toCarryBd
包含None
值。這是怎樣發生像我一樣過濾明確的
def notMissing(row: FooBar): Boolean = {row.foo != null}
iter.filter(notMissing(_)).toSeq.lastOption
非None
值。
(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)
試圖訪問toCarryBd
時,這會導致NoSuchElementException: None.get
。
你能解釋爲什麼地圖會產生大量的「無」條目? –
@GeorgHeiler如果Seq爲空,則iter.filter(notMissing(_)).toSeq.lastOption'將不返回任何值。 –
謝謝。爲什麼當原始df僅包含4行時,它會執行8次? –