2016-04-15 35 views
3

我在火花流應用程序中廣播一個值。但是我不確定如何在不同的類中訪問該變量,而不是廣播它的類。訪問不同類別的Spark廣播變量

我的代碼如下:

object AppMain{ 
    def main(args: Array[String]){ 
    //... 
    val broadcastA = sc.broadcast(a) 
    //.. 
    lines.foreachRDD(rdd => { 
    val obj = AppObject1 
    rdd.filter(p => obj.apply(p)) 
    rdd.count 
    } 
} 

object AppObject1: Boolean{ 
    def apply(str: String){ 
    AnotherObject.process(str) 
    } 
} 
object AnotherObject{ 
    // I want to use broadcast variable in this object 
    val B = broadcastA.Value // compilation error here 
    def process(): Boolean{ 
    //need to use B inside this method 
    } 
} 

任何人都可以提出如何訪問在這種情況下廣播的變量?

回答

4

沒有什麼特別的Spark在這裏忽略了可能的序列化問題。如果你想使用一些對象時,它在當前範圍內可用,你可以做到這一點以同樣的方式和往常一樣:

  • 你可以在一個範圍內定義你的助手,其中廣播已被定義:

    { 
        ... 
        val x = sc.broadcast(1) 
        object Foo { 
         def foo = x.value 
        } 
        ... 
    } 
    
  • ,你可以使用它作爲一個構造函數參數:

    case class Foo(x: org.apache.spark.broadcast.Broadcast[Int]) { 
        def foo = x.value 
    } 
    
    ... 
    
    Foo(sc.broadcast(1)).foo 
    
  • 方法參數

    case class Foo() { 
        def foo(x: org.apache.spark.broadcast.Broadcast[Int]) = x.value 
    } 
    
    ... 
    
    Foo().foo(sc.broadcast(1)) 
    
  • 甚至混合在你的助手這樣的:

    trait Foo { 
        val x: org.apache.spark.broadcast.Broadcast[Int] 
        def foo = x.value 
    } 
    
    object Main extends Foo { 
        val sc = new SparkContext("local", "test", new SparkConf()) 
        val x = sc.broadcast(1) 
    
        def main(args: Array[String]) { 
        sc.parallelize(Seq(None)).map(_ => foo).first 
        sc.stop 
        } 
    } 
    
+2

在將廣播變量傳遞給f時有什麼性能影響聯合參數(例如,在咖喱地圖函數中:'map(bcast)(row)')?典型的Spark示例總是在使用它的函數的相同範圍內實例化一個廣播變量。如果您想將地圖功能移出作用域但仍然引用廣播變量,該怎麼辦? – iralls

+0

@ zero323通過其他方法傳遞廣播或廣播值直到達到它的使用位置是一個壞主意?就像如果我有一個在main中創建的run方法,我將broadcast.value(在你的例子中是「foo」)傳遞給.flatmap?在驅動程序中創建廣播還是可以在放入平面圖的函數中創建它? – SparkleGoat

0

,你可以使用類並通過廣播變量類

您psudo代碼應該是這樣的:

object AppMain{ 
    def main(args: Array[String]){ 
    //... 
    val broadcastA = sc.broadcast(a) 
    //.. 
    lines.foreach(rdd => { 
     val obj = new AppObject1(broadcastA) 
     rdd.filter(p => obj.apply(p)) 
     rdd.count 
    }) 
    } 
} 

class AppObject1(bc : Broadcast[String]){ 
    val anotherObject = new AnotherObject(bc) 
    def apply(str: String): Boolean ={ 
     anotherObject.process(str) 
    } 
} 

class AnotherObject(bc : Broadcast[String]){ 
    // I want to use broadcast variable in this object 
    def process(str : String): Boolean = { 
    val a = bc.value 
    true 
    //need to use B inside this method 
    } 
}