這種問題會導致我管理AmazonDynamoDbClient節流和重試的另一個問題。但是,我認爲解決方案可能在我接到發動機電話之前就已經存在。如何抑制Spark Streaming?
我的高級過程如下:我有一個scala應用程序,它利用Apache Spark讀取大型CSV文件並對它們執行一些聚合,然後將它們寫入發電機。我將這部署到EMR以提供可擴展性。問題是,一旦聚合完成,我們有數百萬條記錄準備好進入發電機,但我們有發電機的寫入能力。它們不需要立即插入,但是可以很好地控制每秒多少次,所以我們可以根據我們的用例對其進行微調。
這裏是什麼,我至今一個代碼示例:
val foreach = new ForeachWriter[Row] {
override def process(value: Row): Unit = {
//write to dynamo here
}
override def close(errorOrNull: Throwable): Unit = {
}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
}
val query = dataGrouped
.writeStream
.queryName("DynamoOutput")
.format("console")
.foreach(foreach)
.outputMode(OutputMode.Complete())
.start()
.awaitTermination()
沒有人有任何建議如何解決這個問題呢?
所以,我看着這個,但我認爲這只是傳入數據。此外,它不會開始輸出,直到流聚合完成,所以我認爲這隻會減慢完成的時間。 –
這是,但我想象每個工人正在獲取數據,然後將它們寫入Dynamo。如果寫入需要一段時間,背壓設置將有助於確保工作人員不會不知所措併成爲瓶頸。除此之外,我不知道有什麼方法來「扼殺」Spark Streaming。 – Vidya
酷,不用擔心,我會給它一個鏡頭並報告回來。謝謝 –