2

我有一個Spark任務,我需要在每個微批處理中寫入SQL查詢的輸出。寫入操作是一項昂貴的操作,導致批處理執行時間超過批處理間隔。如何在Spark Streaming應用程序中異步寫入行以加快批處理執行速度?

我正在尋找提高寫入性能的方法。

  1. 正在單獨的線程中執行寫操作,如下面的一個好選項所示?

  2. 這是否會導致任何副作用,因爲Spark本身以分佈式方式執行?

  3. 是否有其他更好的方法來加速寫入?

    // Create a fixed thread pool to execute asynchronous tasks 
    val executorService = Executors.newFixedThreadPool(2) 
    dstream.foreachRDD { rdd => 
        import org.apache.spark.sql._ 
        val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate 
        import spark.implicits._ 
        import spark.sql 
    
        val records = rdd.toDF("record") 
        records.createOrReplaceTempView("records") 
        val result = spark.sql("select * from records") 
    
        // Submit a asynchronous task to write 
        executorService.submit { 
        new Runnable { 
         override def run(): Unit = { 
         result.write.parquet(output) 
         } 
        } 
        } 
    } 
    

回答

3

1 - 在一個單獨的線程中異步執行寫操作,如下面的一個好選項所示?

號瞭解這裏的問題關鍵是要問「是誰做的寫」。寫入是通過爲集羣中的執行程序分配的作業資源完成的。將寫入命令放置在異步線程池上就像是將新的辦公室管理器添加到具有固定員工的辦公室。考慮到他們必須共享相同的員工,兩名管理人員能夠比單獨工作做更多的工作嗎?那麼,一個合理的答案是「只有當第一位經理沒有給他們足夠的工作,所以有一些免費的能力」。

回到我們的集羣,我們正在處理的是在IO重寫操作。並行寫作業將導致爭用IO資源,從而使每個獨立作業更長。最初,我們的工作看起來可能比「單一經理版」更好,但麻煩最終會打擊我們。 我製作了一張圖表,試圖說明這是如何工作的。請注意,並行作業將花費比例更長的時間,以使它們在時間線上併發。

sequential vs parallel jobs in Spark Streaming

一旦我們達到這個地步的工作開始得到延遲,我們有一個穩定的工作,最終將失敗。

2-這會引起任何的副作用,因爲火花本身以分佈式的方式執行?

我能想到的一些影響:

  • 可能更高集羣的負載和IO爭。
  • 作業在線程池隊列上而不是在Spark Streaming Queue上排隊。我們放棄了通過Spark UI和監視API監視我們的工作的能力,因爲從Spark Streaming的角度來看,延遲是「隱藏的」並且一切正常。

3-是否有其他更好的方法來加速寫入? (從廉價責令貴)

  • 如果要追加到拼花文件,通常會創建一個新的文件。追加隨時間而變得昂貴。
  • 增加批處理間隔或使用窗口操作來編寫更大的Parquet塊。實木複合地板喜歡大文件
  • 調分區和數據的分配比例=>確保星火可以做平行
  • 提高羣集資源的寫,如果有必要增加更多的節點
  • 使用速度更快的存儲
+0

感謝您的詳細解釋!這絕對有幫助。將嘗試改進性能的替代選項。 – vijay

+0

@vijay你的問題是回答?考慮接受答案關閉它。 – maasg

1

是做在一個單獨的線程的寫入操作異步像下面所示的一個很好的選擇?

是的。在優化昂貴的查詢並將結果保存到外部數據存儲時,這當然是需要考慮的事情。

這是否會導致任何副作用,因爲Spark本身以分佈式方式執行?

不這麼認爲。 SparkContext是線程安全的,並促進這種查詢執行。

是否有其他的/更好的方法來加快寫入?

是的!這是瞭解何時使用其他(上述)選項的關鍵。默認情況下,Spark應用程序以FIFO調度模式運行。

報價Scheduling Within an Application

默認情況下,星火的調度運行工作在FIFO方式。每個工作分爲「階段」(例如地圖和縮小階段),第一份工作優先考慮所有可用資源,其階段有任務啓動,然後第二份工作優先等等。如果工作在隊列不需要使用整個集羣,以後的作業可以立即開始運行,但是如果隊列頭部的作業很大,則後面的作業可能會顯着延遲。

從Spark 0.8開始,也可以在作業之間配置公平共享。在公平分享下,Spark以「循環」方式在作業之間分配任務,以便所有作業獲得大致相等的羣集資源份額。這意味着在長時間工作時提交的短工可以立即開始接收資源,並且仍然可以獲得良好的響應時間,而無需等待長時間的工作。該模式最適合多用戶設置。

這意味着,做出一個房間異步執行多次寫入和並行你應該配置你的星火應用程序使用公平調度模式(使用spark.scheduler.mode屬性)。

您必須配置所謂的Fair Scheduler Pools以將執行程序資源(CPU和內存)「劃分」到可使用spark.scheduler.pool屬性分配給作業的池中。

報價Fair Scheduler Pools

沒有任何干預,新提交的工作進入一個默認池,但喬布斯的池可以在多數民衆贊成提交他們的線程加入spark.scheduler.pool‘本地屬性’的SparkContext設置。

+0

謝謝,會試試這個。 – vijay

相關問題