我所看到的Spark廣播變量的所有示例在使用它們的函數範圍(map()
,join()
等)中定義了它們。我想同時使用引用廣播變量的map()
函數和mapPartitions()
函數,但我想對它們進行模塊化,以便我可以使用相同的函數進行單元測試。如何在範圍之外引用Spark廣播變量
- 我該如何做到這一點?
甲以爲我已經是如此,我使用一個map
或mapPartitions
呼叫時傳遞給所述廣播變量的引用咖喱功能。
- 通過傳遞對定義原始範圍內的函數時通常找不到的廣播變量的引用,是否會有任何性能影響?
我有這樣的事情記(僞代碼):
// firstFile.scala
// ---------------
def mapper(bcast: Broadcast)(row: SomeRow): Int = {
bcast.value(row._1)
}
def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
val broadcastVariable = bcast.value
for {
i <- iter
} yield broadcastVariable(i)
})
// secondFile.scala
// ----------------
import firstFile.{mapMyPartition, mapper}
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))
rdd
.map(mapper(bcastVariable))
.mapPartitions(mapMyPartition(bcastVariable))