我是火花新手,使用有一個簡單的火花申請星火SQL/hiveContext到:如何優化火花SQL運行它並行
- 從蜂巢表選擇數據(1個十億行)
- 做一些過濾,聚合包括ROW_NUMBER在窗函數通過選擇第一行,組數()和max()等
- 結果寫入到HBase的(幾百萬行)
我提交工作來運行它在紗線集羣(100個執行器)上,速度很慢,當我查看Spark UI中的DAG可視化時,似乎只有蜂巢表掃描任務並行運行,其餘步驟#2和#3僅運行在一個哪些實例可能應該能夠優化並行化?
該應用程序看起來像:
步驟1:
val input = hiveContext
.sql(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
, row_number() over
(partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
table
WHERE
phone_number <> '911' AND
server_date >= '2015-12-01' and server_date < '2016-01-01' AND
user_id IS NOT NULL AND
first_name IS NOT NULL AND
last_name IS NOT NULL AND
address IS NOT NULL AND
phone_number IS NOT NULL AND
) all_rows
) all_rows_with_row_number
WHERE rn = 1)
val input_tbl = input.registerTempTable(input_tbl)
步驟2:
val result = hiveContext.sql(
SELECT state,
phone_number,
address,
COUNT(*) as hash_count,
MAX(server_ts) as latest_ts
FROM
(SELECT
udf_getState(address) as state
, user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
input_tbl) input
WHERE state IS NOT NULL AND state != ''
GROUP BY state, phone_number, address)
步驟3:
result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)
正如您所看到的,階段0中的「過濾器」,「項目」和「交換」只在一個實例中運行,stage1和stage2也是如此,所以如果問題是幾個問題和道歉啞:
- 在每個執行器進行數據混洗之後,Driver中是否會出現「Filter」,「Project」和「Exchange」?
- 什麼代碼映射到「過濾器」,「項目」和「交換」?
- 我怎麼可以同時運行「過濾器」,「項目」和「交換」來優化性能?
- 可以同時運行stage1和stage2嗎?
您是否檢查過hbase連接器允許下推謂詞?如果是這樣,而不是從HBase提取所有數據,那麼可以讓HBase幫助您至少過濾一些數據。主要的瓶頸通常是I/O和網絡。你的代碼思想中有些東西不清楚。你的桌子代表什麼?它是使用HBase的數據創建的DataFrame嗎?你的輸入數據怎麼樣?恐怕描述有點寬泛。你願意重溫你的問題嗎? – eliasah
@eliasah,感謝您的評論。數據從Hive中提取並存儲到Hbase中。同意瓶頸是I/O和網絡,特別是有很多混洗 - 2TB輸入數據和40GB混洗寫入。我瞭解到,洗牌越少越好,但洗牌也必然與輸入數據的大小有關。如果是這樣,我想知道什麼樣的比例(洗牌/輸入)會是一個很好的比例? –