2017-05-18 75 views
3

我在本地模式下使用Spark 2.1,並運行這個簡單的應用程序。Apache Spark - shuffle寫入比輸入數據大小更多的數據

val N = 10 << 20 

sparkSession.conf.set("spark.sql.shuffle.partitions", "5") 
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString) 
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false") 

val df1 = sparkSession.range(N).selectExpr(s"id as k1") 
val df2 = sparkSession.range(N/5).selectExpr(s"id * 3 as k2") 

df1.join(df2, col("k1") === col("k2")).count() 

在此,範圍(N)產生的(具有唯一值),所以我假定一個數據集

  • DF1 = N * 8個字節〜80MB的大小
  • DF2 = N/5 * 8個字節〜16MB

確定現在讓我們噸以ake df1爲例。 DF1由8個分區和5shuffledRDDs,所以我假定

  • #映射器(M)= 8
  • #減速器的(R)= 5

由於分區數低,Spark將使用Hash Shuffle,它將在磁盤中創建M * R文件,但是我ha不知道每個文件是否具有全部數據,因此each_file_size = data_size導致M * R * data_size文件或all_files = data_size

然而,當執行這個應用程序,洗牌寫df1 = 160MB它不符合上述任何一種情況。

Spark UI

缺少什麼我在這裏?爲什麼洗牌寫入數據的大小增加了一倍?

回答

3

首先,讓我們看看data size total(min, med, max)意味着:

根據SQLMetrics.scala#L88ShuffleExchange.scala#L43,我們看到data size total(min, med, max)dataSize指標洗牌的最終值。那麼,它是如何更新的?每次記錄被序列化時它都會更新:UnsafeRowSerializer.scala#L66 by dataSize.add(row.getSizeInBytes)UnsafeRow是Spark SQL中記錄的內部表示)。

在內部,UnsafeRowbyte[]的支持,和序列化期間被直接複製到基本輸出流,其getSizeInBytes()方法只返回byte[]的長度。因此,最初的問題轉變爲:爲什麼字節表示是記錄所具有的唯一long列的兩倍?此UnsafeRow.scala文檔給我們的答案:

每個元組具有三個部分:[空比特集合] [值] [可變長度部分]

比特集用於空跟蹤並且被對準到8字節邊界。它存儲每場一位。

由於它是8字節字對齊的,所以只有1個空位需要另外8個字節,寬度與長列相同。因此,每個UnsafeRow代表使用16個字節的一個長列行。