3

我是火花新手,使用有一個簡單的火花申請星火SQL/hiveContext到:如何優化火花SQL運行它並行

  1. 從蜂巢表選擇數據(1個十億行)
  2. 做一些過濾,聚合包括ROW_NUMBER在窗函數通過選擇第一行,組數()和max()等
  3. 結果寫入到HBase的(幾百萬行)

我提交工作來運行它在紗線集羣(100個執行器)上,速度很慢,當我查看Spark UI中的DAG可視化時,似乎只有蜂巢表掃描任務並行運行,其餘步驟#2和#3僅運行在一個哪些實例可能應該能夠優化並行化?

該應用程序看起來像:

步驟1:

val input = hiveContext 
    .sql(
    SELECT 
      user_id 
      , address 
      , age 
      , phone_number 
      , first_name 
      , last_name 
      , server_ts 
     FROM 
     (  
      SELECT 
       user_id 
       , address 
       , age 
       , phone_number 
       , first_name 
       , last_name 
       , server_ts 
       , row_number() over 
       (partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn 
      FROM 
      ( 
       SELECT 
        user_id 
        , address 
        , age 
        , phone_number 
        , first_name 
        , last_name 
        , server_ts 
       FROM 
        table 
       WHERE 
        phone_number <> '911' AND 
        server_date >= '2015-12-01' and server_date < '2016-01-01' AND 
        user_id IS NOT NULL AND 
        first_name IS NOT NULL AND 
        last_name IS NOT NULL AND 
        address IS NOT NULL AND 
        phone_number IS NOT NULL AND 
      ) all_rows 
     ) all_rows_with_row_number 
     WHERE rn = 1) 

val input_tbl = input.registerTempTable(input_tbl) 

步驟2:

val result = hiveContext.sql(
    SELECT state, 
     phone_number, 
     address, 
     COUNT(*) as hash_count, 
     MAX(server_ts) as latest_ts 
    FROM 
    (SELECT 
     udf_getState(address) as state 
     , user_id 
     , address 
     , age 
     , phone_number 
     , first_name 
     , last_name 
     , server_ts 
    FROM 
     input_tbl) input 
    WHERE state IS NOT NULL AND state != '' 
    GROUP BY state, phone_number, address) 

步驟3:

result.cache() 
result.map(x => ...).saveAsNewAPIHadoopDataset(conf) 

的DAG可視化看起來像: enter image description here

正如您所看到的,階段0中的「過濾器」,「項目」和「交換」只在一個實例中運行,stage1和stage2也是如此,所以如果問題是幾個問題和道歉啞:

  1. 在每個執行器進行數據混洗之後,Driver中是否會出現「Filter」,「Project」和「Exchange」?
  2. 什麼代碼映射到「過濾器」,「項目」和「交換」?
  3. 我怎麼可以同時運行「過濾器」,「項目」和「交換」來優化性能?
  4. 可以同時運行stage1和stage2嗎?
+0

您是否檢查過hbase連接器允許下推謂詞?如果是這樣,而不是從HBase提取所有數據,那麼可以讓HBase幫助您至少過濾一些數據。主要的瓶頸通常是I/O和網絡。你的代碼思想中有些東西不清楚。你的桌子代表什麼?它是使用HBase的數據創建的DataFrame嗎?你的輸入數據怎麼樣?恐怕描述有點寬泛。你願意重溫你的問題嗎? – eliasah

+0

@eliasah,感謝您的評論。數據從Hive中提取並存儲到Hbase中。同意瓶頸是I/O和網絡,特別是有很多混洗 - 2TB輸入數據和40GB混洗寫入。我瞭解到,洗牌越少越好,但洗牌也必然與輸入數據的大小有關。如果是這樣,我想知道什麼樣的比例(洗牌/輸入)會是一個很好的比例? –

回答

3

你不能正確讀取DAG圖 - 每個步驟是使用單箱並不意味着它不使用多個任務(因此核心)來計算可視化的事實步。

通過鑽入舞臺視圖中可以看到每個步驟使用了多少個任務,該視圖顯示該階段的所有任務。

例如,這裏有一個樣本DAG可視化與你相似:

enter image description here

你可以看到每個階段由步驟「單」欄所示。

但是,如果我們看看下面的表格中,我們可以看到任務的每級數量:

enter image description here

其中之一是隻使用2個任務,但其他使用220,這意味着數據被分割成220個分區,並且分區被並行處理,給定足夠的可用資源。

如果深入瞭解該階段,則可以再次看到它已使用了220個任務和所有任務的詳細信息。從磁盤

enter image description here

只有任務讀取數據顯示在圖中具有這些「多點」,以幫助您瞭解多少文件是如何被讀取。

SO--正如Rashid的回答所暗示的,檢查每個階段的任務數量。

+0

只是爲了增加上面的內容;​​最好是沒有更少的階段,因爲這會表明更少的階段。 「廣泛的依賴性」,從而減少整個集羣的數據移動。 – sourabh

+0

謝謝@Tzach Zohar。這裏學到的絕對新東西!是否還有更多相關文章,我可以閱讀/學習的幻燈片?您的意見已經非常豐富,只是想了解更多信息:) –

+0

謝謝@sourabh。在將數據寫入HBase之前,我還嘗試使用result.cache()和result.repartition(),以查看它們是否對性能優化有用。我沒有看到緩存()反映在DAG圖中,但重新分區在階段2中顯示在DAG中作爲一個框。由於這是一個額外的步驟來執行,我想知道如何將有利於整體性能? –

1

這不是很明顯,所以我會做以下事情以解決問題。

  1. 計算每個步驟的執行時間。
  2. 如果您的表格是文本格式,則第一步可能會很慢,如果數據以實木複合地格式存儲在Hive中,那麼spark通常會更好。
  3. 查看您的表是否由where子句中使用的列分區。
  4. 如果將數據保存到Hbase的速度很慢,那麼您可能需要預先拆分hbase表,因爲默認情況下數據存儲在單個區域中。
  5. 看看階段選項卡中的火花UI,看看有多少任務都開始爲每一個階段,也找數據地方一級描述here

希望,你將能夠零的問題。

+0

謝謝@Rashid Ali。 1.每個步驟的執行時間在spark UI中顯示。據推測,它是準確的提到:) –

+0

2.它的獸人文件 3.它是由幾個在where子句中使用的列分區,而不是所有:( 4.我已經預先將數據按字符,例如{SPLITS => ['a','b','c'...]}作爲行鍵是字符的組合,不確定我是否以這種方式做對了 5.有超過20K的數據讀取任務配置單元,其他階段的任務編號是2001年和64年,大概所有的任務都是並行運行:)我想知道如何讓它更平行。 –

+0

哪一步需要更長的時間,需要進行一些優化,它是讀取配置單元還是將數據存儲到hbase中。關於hbase拆分,如果table沒有正確拆分,那麼數據可以寫入一個或多個可能成爲瓶頸的區域。分割是根據鍵分配數據的關鍵範圍。記住大量的任務總是不好的跡象。例如,如果您的數據是分區的,並且查詢正在從一個或兩個分區讀取數據,而不是大量的任務,則意味着將讀取完整的表,這不是預期的。我希望這個能幫上忙。 –