我基本上想在我的驅動程序中編寫一個事件回調函數,它將在該事件到來時重新啓動火花流應用程序。 我的驅動程序通過讀取文件中的配置來設置流和執行邏輯。 每當文件被改變(新CONFIGS加)驅動程序必須做在一個序列中的以下步驟什麼是重新啓動火花流應用程序的最佳方式?
- 重啓,
- 讀取配置文件(作爲主要方法的一部分) 和
- 設置流
實現此目的的最佳方法是什麼?
我基本上想在我的驅動程序中編寫一個事件回調函數,它將在該事件到來時重新啓動火花流應用程序。 我的驅動程序通過讀取文件中的配置來設置流和執行邏輯。 每當文件被改變(新CONFIGS加)驅動程序必須做在一個序列中的以下步驟什麼是重新啓動火花流應用程序的最佳方式?
實現此目的的最佳方法是什麼?
我目前解決這一問題如下,
訂閱到MQTT主題
在MQTT回調聽外部事件,停止流上下文ssc.stop(true,true)
這將正常關閉流和底層 火花配置
通過創建spark conf並再次啓動spark應用程序 設置u p通過讀取配置文件流
// Contents of startSparkApplication() method sparkConf = new SparkConf().setAppName("SparkAppName") ssc = new StreamingContext(sparkConf, Seconds(1)) val myStream = MQTTUtils.createStream(ssc,...) //provide other options myStream.print() ssc.start()
的應用程序是爲春季啓動應用程序
重新啓動Spark
的最佳方式實際上取決於您的環境。但總是可以使用控制檯。
您可以像其他linux
進程一樣將後臺進程作爲後臺進程,並將其置於shell
的後臺。在你的情況下,工作實際上然後在YARN
上運行驅動程序,因此,它是一個嬰兒坐在已經在另一臺機器上通過YARN
異步運行的進程。我們最近研究(在火花聚會在這裏)
的一種方式是通過串聯使用動物園管理員星火實現這一目標。這簡而言之就是使用Apache Curator來監視Zookeeper上的更改(ZK的配置更改,這可以由外部事件觸發),然後導致偵聽器重新啓動。
引用的代碼庫是here,您會發現config中的更改會導致Watcher(Spark流應用程序)在正常關閉並重新加載更改後重新啓動。希望這是一個指針!
在某些情況下,你可能要重裝流上下文動態(例如重新加載流操作)。 在這種情況下,你可能(斯卡拉爲例):
val sparkContext = new SparkContext()
val stopEvent = false
var streamingContext = Option.empty[StreamingContext]
val shouldReload = false
val processThread = new Thread {
override def run(): Unit = {
while (!stopEvent){
if (streamingContext.isEmpty) {
// new context
streamingContext = Option(new StreamingContext(sparkContext, Seconds(1)))
// create DStreams
val lines = streamingContext.socketTextStream(...)
// your transformations and actions
// and decision to reload streaming context
// ...
streamingContext.get.start()
} else {
if (shouldReload) {
streamingContext.get.stop(stopSparkContext = false, stopGracefully = true)
streamingContext.get.awaitTermination()
streamingContext = Option.empty[StreamingContext]
} else {
Thread.sleep(1000)
}
}
}
streamingContext.get.stop(stopSparkContext =true, stopGracefully = true)
streamingContext.get.awaitTermination()
}
}
// and start it in separate thread
processThread.start()
processThread.join()
或蟒蛇:
spark_context = SparkContext()
stop_event = Event()
spark_streaming_context = None
should_reload = False
def process(self):
while not stop_event.is_set():
if spark_streaming_context is None:
# new context
spark_streaming_context = StreamingContext(spark_context, 0.5)
# create DStreams
lines = spark_streaming_context.socketTextStream(...)
# your transformations and actions
# and decision to reload streaming context
# ...
self.spark_streaming_context.start()
else:
# TODO move to config
if should_reload:
spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True)
spark_streaming_context.awaitTermination()
spark_streaming_context = None
else:
time.sleep(1)
else:
self.spark_streaming_context.stop(stopGraceFully=True)
self.spark_streaming_context.awaitTermination()
# and start it in separate thread
process_thread = threading.Thread(target=process)
process_thread.start()
process_thread.join()
如果你想防止崩潰你的代碼,並重新啓動流上下文從最後的地方使用checkpointing機制。 它允許您在失敗後恢復您的工作狀態。