0

我有一個小型Hive表,在HDFS(parquet/1152文件 - 超過30GB)上保存了1500萬行。如何在沒有重新分區的情況下並行執行Spark UDF

我在做科學文摘的LDA。因此,第一步是使用StanfordNLP提取一些名詞短語/塊短語,我寫了一個UDF來實現這個目標。

現在性能明智,有兩種情況,每種情況都有非常不同的結果。

方案1:

val hiveTable = hivecontext.sql(""" 
SELECT ab AS text, 
      pmid AS id 
    FROM scientific.medline  
    LIMIT 15000000 
""") 

然後,我打電話給我的UDF在我hiveTable

val postagsDF = hiveTable.withColumn("words", StanfordNLP.posPhrases(col("text"))) 

現在,如果我觸發任何動作/轉換如.Count之間()或做CountVectorizer( )on「postagsDF」我看到2個階段。一個具有適當數量的任務(分區),另一個僅具有一個任務。在進行一些輸入/隨機寫入後,第一個結束非常快,但只有一個任務的第二個結束需要很長時間。看來我的UDF在這個只有一項任務的階段正在執行。 (需要時間,沒有資源的活動期間完成)

方案2:

val hiveTable = hivecontext.sql(""" 
SELECT ab AS text, 
      pmid AS id 
    FROM scientific.medline  
    LIMIT 15000000 
""") 

我重新分區我DataFrame根據檢察院的數量火花檢測分區的確切數目。 (我還可以選擇其他號碼,但該號碼看起來不錯,因爲我有提供超過500個核心 - 每個核心2個任務)

val repartitionedDocDF = docDF.repartition(1152) 

現在叫我的UDF在我hiveTable

val postagsDF = hiveTable.withColumn("words", StanfordNLP.posPhrases(col("text"))) 

然而,任何這次行動/轉型將分四個階段。兩個階段(比如說count)是1152個任務,其中兩個是單個任務。我可以看到我的UDF正在執行其中一個階段,所有執行者使用我的整個集羣的1152個任務正確執行。

場景編號1的結果: 看看我的羣集,在長時間運行的單任務階段沒有太多進展。沒有CPU使用情況,沒有內存,沒有網絡,也沒有IO活動。只有一個執行者,其中一項任務是在每個文檔/列上應用我的UDF。

基準測試:方案1號需要3-4小時才能完成100萬行。 (我不能等待,看看它有多少花費15萬行)的情況下2號

結果: 看着我的羣,我可以清楚地看到所有正在利用我的資源。我所有的節點幾乎都處於滿負荷狀態。

基準測試:對於15萬行,場景2需要30分鐘以上。

enter image description here

真正的問題

  1. 剛剛發生了什麼?我認爲在Dataframe上的UDF會默認並行運行。如果部分/任務的數量多於或少於核心總數,也許會重新分區,但至少在默認的200個分區/任務上是並行的。我只想了解爲什麼在我的情況下UDf是單一的任務,並忽略默認的200和實際的分區大小。 (這不僅僅是關於性能,它是單任務作業vs多任務作業)

  2. 是否有任何其他方法可以使UDF在所有執行器上並行執行而不會調用重新分區。我沒有反對重新分區的意見,但這是非常昂貴的操作,我認爲它不應該是使UDF平行運行的唯一方法。即使當我重新分區到完全相同數量的分區/文件時,我仍然需要監視20GB以上的隨機讀取和寫入操作,才能在羣集上飛行。

我看了一下重新分區和UDF一切,但我找不到哪一個可以默認不平行,除非它重新分區運行UDF類似的問題。 (簡單的UDF,當你從int到bigint投出一列的類型可能不可見,但是當你做NLP它真的是可見的)

我的集羣大小:30個節點(16core/32G) - Spark 1.6 Cloudera CDH 5.11.1 星火:--driver-cores 5 --driver-memory 8g --executor-cores 5 --executor-memory 5g --num-executors 116

非常感謝,

UPDATE

我跑沒有LIMIT子句相同的代碼,並在18分鐘內做到了!所以限的理由(關於這一點的答案):

enter image description here

回答

2

這裏的問題是具體涉及到你在查詢中使用LIMIT條款,並有無關UDF。子句將所有結果數據重新分區到單個分區,因此它不適用於大樣本。

如果你想避免這個問題,在某種程度上減少的記錄數,最好先取樣資料:

val p: Double = ??? 

spark.sql(s"SELECT * FROM df TABLESAMPLE($p percent)") 

或:

spark.table("df").sample(false, p) 

其中p是一個記錄的所需部分。

請注意,採用精確數量的值進行採樣將遭受與LIMIT子句相同的問題。

+0

謝謝!你是對的! LIMIT子句是單分區的全部原因,以及爲什麼重分區表現如此。我上週花了一天一夜的時間尋找這個!關於上次陳述的一個問題,以防萬一再次遇到同樣的問題,請您舉一個例子,說明「具有確切值的採樣會遭受同樣的問題」嗎?非常感謝,我真的很感謝你的幫助。我用你的答案結果更新了我的問題,證明你的答案是正確的。 – Maziyar

+1

是的 - 它不是特別直觀或友好的行爲。當某些東西不能表現時,許多人的第一步就是嘗試更小的一組。如果火花不打算人們使用限制,它應該真的發出警告。 – user48956

+0

我絕對同意。完美的捕捉在你身邊,非常感謝。 – Maziyar

相關問題