1
我正面臨來自Spark的奇怪行爲。這裏是我的代碼:Spark正在複製工作
object MyJob {
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val sqlContext = new hive.HiveContext(sc)
val query = "<Some Hive Query>"
val rawData = sqlContext.sql(query).cache()
val aggregatedData = rawData.groupBy("group_key")
.agg(
max("col1").as("max"),
min("col2").as("min")
)
val redisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))
aggregatedData.foreachPartition {
rows =>
writePartitionToRedis(rows, redisConfig)
}
aggregatedData.write.parquet(s"/data/output.parquet")
}
}
針對我的直覺,spark調度器爲每個數據接收器(Redis,HDFS/Parquet)產生兩個作業。問題是第二份工作是執行蜂房查詢並使工作翻倍。我假設兩個寫操作都會共享aggregatedData
階段的數據。有什麼不對或者它的行爲有什麼期望?
我正在緩存,但在'rawData'步驟。我仍然在第二份工作中看到這一步的一些任務。我如何確定第二份工作是從緩存中獲取其階段數據並且不執行不必要的工作? –
是的,你是cachine原始數據,但不是aggregatedData,所以需要在每次迭代時重新評估。如果你進入Spark應用程序的用戶界面你會看到你的應用程序正在做什麼的所有數據。如果不嘗試序列化或緩存到磁盤(使用'persist'和適當的存儲級別),請確保緩存的數據適合內存。 – puhlen