2016-04-25 58 views
3

中流更改配置我有一個Spark流/ DSTREAM應用程式,例如this與檢查點的星火流

// Function to create and setup a new StreamingContext 
def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

// Do additional setup on context that needs to be done, 
// irrespective of whether it is being started or restarted 
context. ... 

// Start the context 
context.start() 
context.awaitTermination() 

凡我上下文使用配置文件,在那裏我可以拉與像appConf.getString方法的項目。所以我實際使用:

val context = StreamingContext.getOrCreate(
    appConf.getString("spark.checkpointDirectory"), 
    () => createStreamContext(sparkConf, appConf)) 

其中val sparkConf = new SparkConf()...

如果我停止我的應用程序並更改應用程序文件中的配置,除非刪除檢查點目錄內容,否則不會選取這些更改。例如,我想動態更改spark.streaming.kafka.maxRatePerPartitionspark.windowDurationSecs。 (編輯:我殺了應用程序,更改配置文件,然後重新啓動應用程序。)我怎樣才能動態更改這些設置或強制執行(EDITED WORD)配置更改而不浪費我的檢查點目錄(即將包含檢查點爲狀態信息)?

回答

1

我怎會動態更改這些設置或執行更改配置,而不搗毀我檢查點目錄?

如果潛入了StreamingContext.getOrCreate代碼:

def getOrCreate(
    checkpointPath: String, 
    creatingFunc:() => StreamingContext, 
    hadoopConf: Configuration = SparkHadoopUtil.get.conf, 
    createOnError: Boolean = false 
): StreamingContext = { 
    val checkpointOption = CheckpointReader.read(
     checkpointPath, new SparkConf(), hadoopConf, createOnError) 
    checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) 
} 

你可以看到,如果CheckpointReader已在類路徑設置檢查點的數據,它使用new SparkConf()作爲參數,因爲超載不允許傳遞一個自定義創建的SparkConf。默認情況下,SparkConf將加載任何設置聲明爲環境變量或傳遞到類路徑:

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { 

    import SparkConf._ 

    /** Create a SparkConf that loads defaults from system properties and the classpath */ 
    def this() = this(true) 

所以實現的一種方式,你想要的是,而不是創建在代碼中SparkConf對象,你可以通過參數通過spark.driver.extraClassPathspark.executor.extraClassPath到​​。

2

您是否按照文檔建議的方式使用StreamingContext.getOrCreate創建了流式上下文,它將前面的checkpointDirectory作爲參數?

// Function to create and setup a new StreamingContext 
def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

// Do additional setup on context that needs to be done, 
// irrespective of whether it is being started or restarted 
context. ... 

// Start the context 
context.start() 
context.awaitTermination() 

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

+0

是的,我做了與上面幾乎相同的事情。唯一的主要區別是我的'ssc.socketTextStream(...)'更像''KafkaUtils.createDirectStream' ssc作爲參數。 – codeaperature

+0

花了一段時間才弄清楚了這一點......我錯過了一個非常微妙的地方,我需要完整的瞭解這張照片。如果我使用'context.sparkContext.getConf.set(「spark.xxxx」,「10」)',我可以設置任何需要使用的Spark Conf參數(在你提供的位置)。這是我需要知道的最後一部分。 – codeaperature

+0

真棒,很高興它的工作!感謝你的接納! –

1

當您從檢查點目錄恢復時,無法添加/更新火花配置。您可以在文檔中找到火花點檢查行爲:

當程序第一次啓動時,它會創建一個新的StreamingContext,設置所有的流,然後調用start()。 當程序被破壞後重新啓動,它會在檢查點目錄

所以如果你使用檢查點目錄,然後對工作的重新啓動將重新創建一個的StreamingContext重新創建檢查點數據的StreamingContext檢查點數據將具有舊的sparkConf。