2016-03-02 77 views
2

我有一個關於Java Spark應用程序中代碼結構的通用問題。我想要將用於實現Spark轉換的代碼與RDD上的調用分開,因此即使在使用大量包含大量代碼行的轉換時,應用程序的源代碼仍然保持清晰。Apache Spark:如何構造Spark應用程序的代碼(特別是在使用廣播時)

我先給你一個簡短的例子。在這種情況下,flatMap轉換的實現是作爲匿名內部類提供的。這是一個簡單的應用程序,讀取整數的RDD,然後乘以每個元素爲一個整數陣列,它被廣播給所有的工作節點之前:

public static void main(String[] args) { 

    SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9)); 

    final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 }); 

    result = result.flatMap(new FlatMapFunction<Integer, Integer>() { 
     public Iterable<Integer> call(Integer t) throws Exception { 
      int[] values = factors.value(); 
      LinkedList<Integer> result = new LinkedList<Integer>(); 
      for (int value : values) result.add(t * value); 
      return result; 
     } 
    }); 

    System.out.println(result.collect()); // [5, 10, 15, 8, 16, 24, 9, 18, 27] 

    sc.close(); 
} 

爲了構建代碼我已提取的火花功能的實現到不同的班級。類SparkFunctions爲flatMap轉換提供了實現,並且有一個setter方法來獲得對廣播變量的引用(...在我的真實世界場景中,這個類中有許多操作都可以訪問廣播的數據)。

我已經體會到,表示Spark轉換的方法只要不訪問Broadcast變量或Accumulator變量就可以是靜態的。爲什麼?靜態方法只能訪問靜態屬性。對廣播變量的靜態引用始終爲null(可能因爲Spark在將類SparkFunctions發送到工作節點時未對其進行序列化)。

@SuppressWarnings("serial") 
public class SparkFunctions implements Serializable { 

    private Broadcast<int[]> factors; 

    public SparkFunctions() { 
    } 

    public void setFactors(Broadcast<int[]> factors) { 
     this.factors = factors; 
    } 

    public final FlatMapFunction<Integer, Integer> myFunction = new FlatMapFunction<Integer, Integer>() { 
     public Iterable<Integer> call(Integer t) throws Exception { 
      int[] values = factors.value(); 
      LinkedList<Integer> result = new LinkedList<Integer>(); 
      for (int value : values) result.add(t * value); 
      return result; 
     } 
    }; 

} 

這是使用類SparkFunctions應用程序的第二個版本:

public static void main(String[] args) { 

    SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9)); 

    final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 }); 

    // 1) Initializing 
    SparkFunctions functions = new SparkFunctions(); 

    // 2) Pass reference of broadcast variable 
    functions.setFactors(factors); 

    // 3) Implementation is now in the class SparkFunctions 
    result = result.flatMap(functions.myFunction); 

    System.out.println(result.collect()); // [5, 10, 15, 8, 16, 24, 9, 18, 27] 

    sc.close(); 
} 

應用程序的兩個版本都工作(本地及集羣設置),但我問他們是否也同樣高效。

問題1:在我看來,Spark將包含廣播變量的類SparkFunctions序列化併發送給工作節點,以便節點可以在其任務中使用該函數。是數據發送兩次到工作節點,首先使用SparkContext廣播,然後再上一次序列化的類SparkFunctions?還是它甚至每個元素髮送一次(加上廣播1)?

問題2:您能否提供關於源代碼如何結構化的建議?

請不要提供解決方案如何防止廣播。我有一個更復雜的真實應用程序。

,我也發現了類似問題,這是不是真的有幫助:

預先感謝您的幫助!

回答

0

這是關於問題1

當火花提交作業,作業分爲滑臺>任務。這些任務實際上執行了工作節點上的轉換和操作。驅動程序的sumbitTask()會將有關廣播變量的函數和元數據序列化到所有節點。

解析廣播的工作原理。

驅動程序創建本地目錄以存儲要廣播的數據並啓動可訪問目錄的HttpServer。當調用廣播時,數據實際上被寫入目錄(val bdata = sc.broadcast(data))。與此同時,數據也被寫入具有StorageLevel內存+磁盤的驅動程序blockManger中。塊管理器爲數據分配一個blockId(類型爲BroadcastBlockId)。

只有在執行程序對接收到的任務進行反序列化時,纔會廣播真實數據,並以廣播對象的形式獲取廣播變量的元數據。然後它調用元數據對象(bdata變量)的readObject()方法。此方法將首先檢查本地塊管理器以查看是否已有本地副本。否則,數據將從驅動程序中獲取。一旦數據被提取,它就存儲在本地塊管理器中供以後使用。

+0

我想你已經回答了我的問題。從主方法給予SparkFunctions類的Broadcast對象只包含元數據而不包含數據本身。那麼這個解決方案應該有效地工作謝謝! – Andreas

相關問題