2016-08-12 91 views
1

最近升級到2.0星火,並試圖創建JSON字符串一個簡單的數據集時,我看到一些奇怪的行爲。這裏有一個簡單的測試案例:爲什麼SparkSession爲一個動作執行兩次?

SparkSession spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate(); 
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); 

JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
      "{\"name\":\"tom\",\"title\":\"engineer\",\"roles\":[\"designer\",\"developer\"]}", 
      "{\"name\":\"jack\",\"title\":\"cto\",\"roles\":[\"designer\",\"manager\"]}" 
     )); 

JavaRDD<String> mappedRdd = rdd.map(json -> { 
    System.out.println("mapping json: " + json); 
    return json; 
}); 

Dataset<Row> data = spark.read().json(mappedRdd); 
data.show(); 

和輸出:

mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]} 
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]} 
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]} 
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]} 
+----+--------------------+--------+ 
|name|    roles| title| 
+----+--------------------+--------+ 
| tom|[designer, develo...|engineer| 
|jack| [designer, manager]|  cto| 
+----+--------------------+--------+ 

似乎正在執行的「地圖」功能的兩倍,即使我只執行一個動作。我認爲Spark會懶惰地構建一個執行計劃,然後在需要時執行它,但這看起來似乎爲了以JSON的形式讀取數據並對其執行任何操作,該計劃必須至少執行兩次。

在這個簡單的例子沒關係,但當地圖功能是長時間運行,這將成爲一個大問題。這是對的,還是我錯過了什麼?

回答

2

它發生,因爲你不爲DataFrameReader提供架構。因此Spark必須急切地掃描數據集來推斷輸出模式。

由於mappedRdd是不緩存它會被兩次評估:

  • 一次架構推斷
  • 一旦當你調用data.show

如果你想阻止你應該爲讀者提供架構(Scala的語法):

val schema: org.apache.spark.sql.types.StructType = ??? 
spark.read.schema(schema).json(mappedRdd)