我有一個關於Java Spark應用程序中代碼結構的通用問題。我想要將用於實現Spark轉換的代碼與RDD上的調用分開,因此即使在使用大量包含大量代碼行的轉換時,應用程序的源代碼仍然保持清晰。Apache Spark:如何構造Spark應用程序的代碼(特別是在使用廣播時)
我先給你一個簡短的例子。在這種情況下,flatMap轉換的實現是作爲匿名內部類提供的。這是一個簡單的應用程序,讀取整數的RDD,然後乘以每個元素爲一個整數陣列,它被廣播給所有的工作節點之前:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));
final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });
result = result.flatMap(new FlatMapFunction<Integer, Integer>() {
public Iterable<Integer> call(Integer t) throws Exception {
int[] values = factors.value();
LinkedList<Integer> result = new LinkedList<Integer>();
for (int value : values) result.add(t * value);
return result;
}
});
System.out.println(result.collect()); // [5, 10, 15, 8, 16, 24, 9, 18, 27]
sc.close();
}
爲了構建代碼我已提取的火花功能的實現到不同的班級。類SparkFunctions
爲flatMap轉換提供了實現,並且有一個setter方法來獲得對廣播變量的引用(...在我的真實世界場景中,這個類中有許多操作都可以訪問廣播的數據)。
我已經體會到,表示Spark轉換的方法只要不訪問Broadcast變量或Accumulator變量就可以是靜態的。爲什麼?靜態方法只能訪問靜態屬性。對廣播變量的靜態引用始終爲null
(可能因爲Spark在將類SparkFunctions
發送到工作節點時未對其進行序列化)。
@SuppressWarnings("serial")
public class SparkFunctions implements Serializable {
private Broadcast<int[]> factors;
public SparkFunctions() {
}
public void setFactors(Broadcast<int[]> factors) {
this.factors = factors;
}
public final FlatMapFunction<Integer, Integer> myFunction = new FlatMapFunction<Integer, Integer>() {
public Iterable<Integer> call(Integer t) throws Exception {
int[] values = factors.value();
LinkedList<Integer> result = new LinkedList<Integer>();
for (int value : values) result.add(t * value);
return result;
}
};
}
這是使用類SparkFunctions
應用程序的第二個版本:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));
final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });
// 1) Initializing
SparkFunctions functions = new SparkFunctions();
// 2) Pass reference of broadcast variable
functions.setFactors(factors);
// 3) Implementation is now in the class SparkFunctions
result = result.flatMap(functions.myFunction);
System.out.println(result.collect()); // [5, 10, 15, 8, 16, 24, 9, 18, 27]
sc.close();
}
應用程序的兩個版本都工作(本地及集羣設置),但我問他們是否也同樣高效。
問題1:在我看來,Spark將包含廣播變量的類SparkFunctions
序列化併發送給工作節點,以便節點可以在其任務中使用該函數。是數據發送兩次到工作節點,首先使用SparkContext
廣播,然後再上一次序列化的類SparkFunctions
?還是它甚至每個元素髮送一次(加上廣播1)?
問題2:您能否提供關於源代碼如何結構化的建議?
請不要提供解決方案如何防止廣播。我有一個更復雜的真實應用程序。
,我也發現了類似問題,這是不是真的有幫助:
- Spark Java Code Structure
- BroadCast Variables In Spark
- Spark: passing broadcast variable to executors
預先感謝您的幫助!
我想你已經回答了我的問題。從主方法給予SparkFunctions類的Broadcast對象只包含元數據而不包含數據本身。那麼這個解決方案應該有效地工作謝謝! – Andreas