2017-01-04 69 views
1

如何處理空分區mapPartitionsWithIndex火花mapPartitionsWithIndex處理空分區

完整的例子可以發現:https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2

我的目標是通過RDD的一個已知的完好價值的Spark/Scala: fill nan with last good observation的改進,以填補NaN值。

但一些分區不包含任何值:

###################### carry 
Map(2 -> None, 5 -> None, 4 -> None, 7 -> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 -> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)), 6 -> None, 0 -> None) 
(2,None) 
(5,None) 
(4,None) 
(7,Some(FooBar(2016-01-04,lastAssumingSameDate))) 
(1,Some(FooBar(2016-01-01,first))) 
(3,Some(FooBar(2016-01-02,second))) 
(6,None) 
(0,None) 
() 
###################### carry 

case class FooBar(foo: Option[Date], bar: String) 
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), 
    ("2016-wrongFormat", "noValidFormat"), 
    ("2016-01-04", "lastAssumingSameDate")) 
    .toDF("foo", "bar") 
    .withColumn("foo", 'foo.cast("Date")) 
    .as[FooBar] 
def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined 
myDf.rdd.filter(x => notMissing(Some(x))).count 
val toCarry: Map[Int, Option[FooBar]] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(x => notMissing(Some(x))).toSeq.lastOption)) }.collectAsMap 

當使用

val toCarryBd = spark.sparkContext.broadcast(toCarry) 
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = { 
    if (iter.isEmpty) { 
     iter 
    } else { 
     var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get 
     iter.map(foo => { 
     println("original ", foo) 
     if (!notMissing(Some(foo))) { 
      println("replaced") 
      // this will go into the default case 
      // FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("2016-01-01")), "DUMMY")).foo, foo.bar) 
      FooBar(lastNotNullRow.get.foo, foo.bar) // TODO warning this throws an error 
     } else { 
      lastNotNullRow = Some(foo) 
      foo 
     } 
     }) 
    } 
    } 

    val imputed: RDD[FooBar] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } 

填補它會崩潰的值。

編輯

如果從答案中應用輸入,則輸出。還沒100%有

+----------+--------------------+ 
|  foo|     bar| 
+----------+--------------------+ 
|2016-01-01|    first| 
|2016-01-02|    second| 
|2016-01-04|  noValidFormat| 
|2016-01-04|lastAssumingSameDate| 
+----------+--------------------+ 

回答

1

至於工作mapPartitions(以及類似)處理時,空分區,一般的方法是,當你有一個空的輸入迭代器返回正確類型的空迭代。

它看起來像你的代碼是這樣做的,但是它看起來像你的應用程序邏輯中可能有一個錯誤(即它假定如果一個分區有一個記錄缺少一個值,它將有一個前一行相同的分區,這是好的,或者以前的分區不是空的並且具有好的行 - 這不一定是這種情況)。你已經部分解決了這個問題,並且對於每個分區收集最後一個好值,然後如果在分區開始時沒有很好的值,請查看收集數組中的值。

但是,如果這也發生在同一時間以前的分區爲空,您將需要去查找以前的分區值,直到找到您正在查找的分區值。 (請注意,假定數據集中的第一條記錄是有效的,如果不是,您的代碼仍然會失敗)。

您的解決方案非常接近工作,但只是有一些小的假設,並不總是必要的。

+0

感謝您的評論。這有助於我填補下一個,但不是最後一個已知的價值。 –

+0

對,你只需要向後搜索做最後一次已知的好事。 – Holden

+0

你的意思是代替+1 a -1:'while(lastNotNullRow == None){last_NotNullRow = toCarryBd.value.get(i + 1).get }'?但是如果第一個分區是空的,那麼這將不起作用(在這種情況下),我認爲替換映射已經按照正確的順序。 –