我在本地模式下使用Spark 2.1,並運行這個簡單的應用程序。Apache Spark - shuffle寫入比輸入數據大小更多的數據
val N = 10 << 20
sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N/5).selectExpr(s"id * 3 as k2")
df1.join(df2, col("k1") === col("k2")).count()
在此,範圍(N)產生的龍(具有唯一值),所以我假定一個數據集
- DF1 = N * 8個字節〜80MB的大小
- DF2 = N/5 * 8個字節〜16MB
確定現在讓我們噸以ake df1爲例。 DF1由8個分區和5shuffledRDDs,所以我假定
- #映射器(M)= 8
- #減速器的(R)= 5
由於分區數低,Spark將使用Hash Shuffle,它將在磁盤中創建M * R文件,但是我ha不知道每個文件是否具有全部數據,因此each_file_size = data_size導致M * R * data_size文件或all_files = data_size。
然而,當執行這個應用程序,洗牌寫df1 = 160MB它不符合上述任何一種情況。
缺少什麼我在這裏?爲什麼洗牌寫入數據的大小增加了一倍?