中流更改配置我有一個Spark流/ DSTREAM應用程式,例如this:與檢查點的星火流
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
凡我上下文使用配置文件,在那裏我可以拉與像appConf.getString
方法的項目。所以我實際使用:
val context = StreamingContext.getOrCreate(
appConf.getString("spark.checkpointDirectory"),
() => createStreamContext(sparkConf, appConf))
其中val sparkConf = new SparkConf()...
。
如果我停止我的應用程序並更改應用程序文件中的配置,除非刪除檢查點目錄內容,否則不會選取這些更改。例如,我想動態更改spark.streaming.kafka.maxRatePerPartition
或spark.windowDurationSecs
。 (編輯:我殺了應用程序,更改配置文件,然後重新啓動應用程序。)我怎樣才能動態更改這些設置或強制執行(EDITED WORD)配置更改而不浪費我的檢查點目錄(即將包含檢查點爲狀態信息)?
是的,我做了與上面幾乎相同的事情。唯一的主要區別是我的'ssc.socketTextStream(...)'更像''KafkaUtils.createDirectStream' ssc作爲參數。 – codeaperature
花了一段時間才弄清楚了這一點......我錯過了一個非常微妙的地方,我需要完整的瞭解這張照片。如果我使用'context.sparkContext.getConf.set(「spark.xxxx」,「10」)',我可以設置任何需要使用的Spark Conf參數(在你提供的位置)。這是我需要知道的最後一部分。 – codeaperature
真棒,很高興它的工作!感謝你的接納! –