2016-10-16 51 views
4

我有上面的代碼作爲spark驅動程序,當我執行我的程序時,它可以正常工作將所需數據保存爲parquet文件。Spark java地圖函數執行兩次

 String indexFile = "index.txt"; 
     JavaRDD<String> indexData = sc.textFile(indexFile).cache(); 
     JavaRDD<String> jsonStringRDD = indexData.map(new Function<String, String>() { 
     @Override 
     public String call(String patientId) throws Exception { 
     return "json array as string" 
     } 
     }); 

//1. Read json string array into a Dataframe (execution 1) 
     DataFrame dataSchemaDF = sqlContext.read().json(jsonStringRDD); 
//2. Save dataframe as parquet file (execution 2) 
     dataSchemaDF.write().parquet("md.parquet"); 

但我觀察到我在RDD indexData上的映射函數正在執行兩次。 第一,當我讀到jsonStringRdd如使用SQLContextDataFrame,當我寫的dataSchemaDF到拼花文件

你可以指導我在這,如何避免這種重複執行?有沒有其他更好的方法將json字符串轉換爲Dataframe?

+0

你在哪裏看到兩張地圖? RDD's被懶惰地評估。 'map'操作是一個轉換,而不是一個操作,所以'jsonStringRDD'的分配不應該立即運行。也許用於讀取數據框和寫入鑲木地板的執行路徑都需要收集RDD。 –

+0

我在mapper函數中有日誌語句,我在日誌中看到它們兩次。 – blob

回答

6

我相信原因是JSON閱讀器缺少模式。執行時:

sqlContext.read().json(jsonStringRDD); 

Spark必須推斷新創建的DataFrame的模式。要做到這一點它具有掃描輸入RDD,如果你想避免它急切地進行

這一步,你必須創建一個StructType它描述了JSON文件的形狀:

​​

,並使用它時,你創建DataFrame

DataFrame dataSchemaDF = sqlContext.read().schema(schema).json(jsonStringRDD);