2017-03-28 106 views
0

這種問題會導致我管理AmazonDynamoDbClient節流和重試的另一個問題。但是,我認爲解決方案可能在我接到發動機電話之前就已經存在。如何抑制Spark Streaming?

我的高級過程如下:我有一個scala應用程序,它利用Apache Spark讀取大型CSV文件並對它們執行一些聚合,然後將它們寫入發電機。我將這部署到EMR以提供可擴展性。問題是,一旦聚合完成,我們有數百萬條記錄準備好進入發電機,但我們有發電機的寫入能力。它們不需要立即插入,但是可以很好地控制每秒多少次,所以我們可以根據我們的用例對其進行微調。

這裏是什麼,我至今一個代碼示例:

val foreach = new ForeachWriter[Row] { 
    override def process(value: Row): Unit = { 
     //write to dynamo here 
    } 

    override def close(errorOrNull: Throwable): Unit = { 
    } 

    override def open(partitionId: Long, version: Long): Boolean = { 
     true 
    } 
    } 

val query = dataGrouped 
    .writeStream 
    .queryName("DynamoOutput") 
    .format("console") 
    .foreach(foreach) 
    .outputMode(OutputMode.Complete()) 
    .start() 
    .awaitTermination() 

沒有人有任何建議如何解決這個問題呢?

回答

0

您應該查看spark.streaming.backpressure.enabled配置。從documentation

設置最大接收速率 - 如果羣集資源不足以使流式應用程序處理數據的速度與接收速度一樣快,則可以通過設置最大速率限制來限制接收器速率在記錄/秒。查看接收器的配置參數spark.streaming.receiver.maxRate和直接Kafka方法的spark.streaming.kafka.maxRatePerition。在Spark 1.5中,我們引入了一項名爲backpressure的功能,無需設置此速率限制,因爲Spark Streaming會自動計算出速率限制並在處理條件發生變化時動態調整速率限制。可以通過將配置參數spark.streaming.backpressure.enabled設置爲true來啓用該背壓。

+0

所以,我看着這個,但我認爲這只是傳入數據。此外,它不會開始輸出,直到流聚合完成,所以我認爲這隻會減慢完成的時間。 –

+0

這是,但我想象每個工人正在獲取數據,然後將它們寫入Dynamo。如果寫入需要一段時間,背壓設置將有助於確保工作人員不會不知所措併成爲瓶頸。除此之外,我不知道有什麼方法來「扼殺」Spark Streaming。 – Vidya

+0

酷,不用擔心,我會給它一個鏡頭並報告回來。謝謝 –