2016-12-22 28 views
1

我正面臨來自Spark的奇怪行爲。這裏是我的代碼:Spark正在複製工作

object MyJob { 
    def main(args: Array[String]): Unit = { 
     val sc = new SparkContext() 
     val sqlContext = new hive.HiveContext(sc) 

     val query = "<Some Hive Query>" 
     val rawData = sqlContext.sql(query).cache() 

     val aggregatedData = rawData.groupBy("group_key") 
      .agg(
       max("col1").as("max"), 
       min("col2").as("min") 
      ) 

     val redisConfig = new RedisConfig(new RedisEndpoint(sc.getConf)) 
     aggregatedData.foreachPartition { 
      rows => 
       writePartitionToRedis(rows, redisConfig) 
     } 

     aggregatedData.write.parquet(s"/data/output.parquet") 
    } 
} 

針對我的直覺,spark調度器爲每個數據接收器(Redis,HDFS/Parquet)產生兩個作業。問題是第二份工作是執行蜂房查詢並使工作翻倍。我假設兩個寫操作都會共享aggregatedData階段的數據。有什麼不對或者它的行爲有什麼期望?

回答

0

你錯過了火花的基本概念:懶惰。

RDD不包含任何數據,它只是一組指令,當您調用某個操作(如將數據寫入磁盤/ hdfs)時,這些指令將被執行。如果您重新使用RDD(或Dataframe),則不存儲存儲的數據,只存儲每次調用操作時需要評估的指令。

如果你想,而無需重新評估的RDD,使用.cache()或最好persist重用數據。持久化RDD允許您存儲轉換的結果,以便在將來的迭代中不需要重新評估RDD。

+0

我正在緩存,但在'rawData'步驟。我仍然在第二份工作中看到這一步的一些任務。我如何確定第二份工作是從緩存中獲取其階段數據並且不執行不必要的工作? –

+0

是的,你是cachine原始數據,但不是aggregatedData,所以需要在每次迭代時重新評估。如果你進入Spark應用程序的用戶界面你會看到你的應用程序正在做什麼的所有數據。如果不嘗試序列化或緩存到磁盤(使用'persist'和適當的存儲級別),請確保緩存的數據適合內存。 – puhlen