1

我基本上想在我的驅動程序中編寫一個事件回調函數,它將在該事件到來時重新啓動火花流應用程序。 我的驅動程序通過讀取文件中的配置來設置流和執行邏輯。 每當文件被改變(新CONFIGS加)驅動程序必須做在一個序列中的以下步驟什麼是重新啓動火花流應用程序的最佳方式?

  1. 重啓,
  2. 讀取配置文件(作爲主要方法的一部分)
  3. 設置流

實現此目的的最佳方法是什麼?

回答

0

我目前解決這一問題如下,

  • 訂閱到MQTT主題

  • 在MQTT回調聽外部事件,停止流上下文ssc.stop(true,true)這將正常關閉流和底層 火花配置

  • 通過創建spark conf並再次啓動spark應用程序 設置u p通過讀取配置文件流

// Contents of startSparkApplication() method 
sparkConf = new SparkConf().setAppName("SparkAppName") 
ssc = new StreamingContext(sparkConf, Seconds(1)) 
val myStream = MQTTUtils.createStream(ssc,...) //provide other options 
myStream.print() 
ssc.start() 

的應用程序是爲春季啓動應用程序

0

重新啓動Spark的最佳方式實際上取決於您的環境。但總是可以使用​​控制檯。

您可以像其他linux進程一樣將後臺進程​​作爲後臺進程,並將其置於shell的後臺。在你的情況下,​​工作實際上然後在YARN上運行驅動程序,因此,它是一個嬰兒坐在已經在另一臺機器上通過YARN異步運行的進程。我們最近研究(在火花聚會在這裏)

Cloudera blog

0

的一種方式是通過串聯使用動物園管理員星火實現這一目標。這簡而言之就是使用Apache Curator來監視Zookeeper上的更改(ZK的配置更改,這可以由外部事件觸發),然後導致偵聽器重新啓動。

引用的代碼庫是here,您會發現config中的更改會導致Watcher(Spark流應用程序)在正常關閉並重新加載更改後重新啓動。希望這是一個指針!

1

在某些情況下,你可能要重裝流上下文動態(例如重新加載流操作)。 在這種情況下,你可能(斯卡拉爲例):

val sparkContext = new SparkContext() 

val stopEvent = false 
var streamingContext = Option.empty[StreamingContext] 
val shouldReload = false 

val processThread = new Thread { 
    override def run(): Unit = { 
    while (!stopEvent){ 
     if (streamingContext.isEmpty) { 

     // new context 
     streamingContext = Option(new StreamingContext(sparkContext, Seconds(1))) 

     // create DStreams 
      val lines = streamingContext.socketTextStream(...) 

     // your transformations and actions 
     // and decision to reload streaming context 
     // ... 

     streamingContext.get.start() 
     } else { 
     if (shouldReload) { 
      streamingContext.get.stop(stopSparkContext = false, stopGracefully = true) 
      streamingContext.get.awaitTermination() 
      streamingContext = Option.empty[StreamingContext] 
     } else { 
      Thread.sleep(1000) 
     } 
     } 

    } 
    streamingContext.get.stop(stopSparkContext =true, stopGracefully = true) 
    streamingContext.get.awaitTermination() 
    } 
} 

// and start it in separate thread 
processThread.start() 
processThread.join() 

蟒蛇

spark_context = SparkContext() 

stop_event = Event() 
spark_streaming_context = None 
should_reload = False 

def process(self): 
    while not stop_event.is_set(): 
     if spark_streaming_context is None: 

      # new context 
      spark_streaming_context = StreamingContext(spark_context, 0.5) 

      # create DStreams 
      lines = spark_streaming_context.socketTextStream(...) 

      # your transformations and actions 
      # and decision to reload streaming context 
      # ... 

      self.spark_streaming_context.start() 
     else: 
      # TODO move to config 
      if should_reload: 
       spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True) 
       spark_streaming_context.awaitTermination() 
       spark_streaming_context = None 
      else: 
       time.sleep(1) 
    else: 
     self.spark_streaming_context.stop(stopGraceFully=True) 
     self.spark_streaming_context.awaitTermination() 


# and start it in separate thread 
process_thread = threading.Thread(target=process) 
process_thread.start() 
process_thread.join() 

如果你想防止崩潰你的代碼,並重新啓動流上下文從最後的地方使用checkpointing機制。 它允許您在失敗後恢復您的工作狀態。

相關問題