2016-04-25 84 views
3

我所看到的Spark廣播變量的所有示例在使用它們的函數範圍(map(),join()等)中定義了它們。我想同時使用引用廣播變量的map()函數和mapPartitions()函數,但我想對它們進行模塊化,以便我可以使用相同的函數進行單元測試。如何在範圍之外引用Spark廣播變量

  • 我該如何做到這一點?

甲以爲我已經是如此,我使用一個mapmapPartitions呼叫時傳遞給所述廣播變量的引用咖喱功能。

  • 通過傳遞對定義原始範圍內的函數時通常找不到的廣播變量的引用,是否會有任何性能影響?

我有這樣的事情記(僞代碼):

// firstFile.scala 
// --------------- 

def mapper(bcast: Broadcast)(row: SomeRow): Int = { 
    bcast.value(row._1) 
} 

def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator { 
    val broadcastVariable = bcast.value 

    for { 
    i <- iter 
    } yield broadcastVariable(i) 
}) 


// secondFile.scala 
// ---------------- 

import firstFile.{mapMyPartition, mapper} 

val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3)) 

rdd 
.map(mapper(bcastVariable)) 
.mapPartitions(mapMyPartition(bcastVariable)) 

回答

2

您的解決方案應該能正常運行。在這兩種情況下,傳遞到map{Partitions}的函數將包含對串行化時的廣播變量本身的引用,但不包含其值,並且只在節點上計算時調用bcast.value

有什麼需要避免的是一樣的東西

def mapper(bcast: Broadcast): SomeRow => Int = { 
    val value = bcast.value 
    row => value(row._1) 
} 
1

您正確地做這個。你只需要記住傳遞廣播參考而不是價值本身。使用您的示例的差別可能會顯示如下:

一)有效的方式:

// the whole Map[Int, Int] is serialized and sent to every worker 
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3)) 

rdd 
.map(mapper(bcastVariable)) // only the reference to the Map[Int, Int] is serialized and sent to every worker 
.mapPartitions(mapMyPartition(bcastVariable)) // only the reference to the Map[Int, Int] is serialized and sent to every worker 

B)低效的方式:

// the whole Map[Int, Int] is serialized and sent to every worker 
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3)) 

rdd 
.map(mapper(bcastVariable.value)) // the whole Map[Int, Int] is serialized and sent to every worker 
.mapPartitions(mapMyPartition(bcastVariable.value)) // the whole Map[Int, Int] is serialized and sent to every worker 

當然在第二個例子中mappermapMyPartition將有簽名略有不同。

相關問題