2017-07-15 37 views
1

我正在嘗試在火花流中實現兩個階段的過程。首先,我打開一個kafkaStream,使用auto.offset.reset=earliest讀取主題中已有的所有內容,並在其上訓練我的模型。 我使用流,因爲我無法找到如何做到這一點,而無需打開流(Spark - Get earliest and latest offset of Kafka without opening stream)。 因爲我還沒有發現一種方法來停止流而不停止整個StreamingContext,所以我使用ssc.stop(true, true)停止模型計算後的上下文。用不同的StreamingContext彼此打開兩個KafkaStreams

當我現在嘗試創建一個新的StreamingContext(使用舊的sparkConfig或具有相同參數的新sparkConfig)時,請調用我的方法以new groupId打開一個新的KafkaStream,並且看起來好像沒有流式發生當我爲卡夫卡話題寫新內容時。 forEachRDD中的print(),count()和println都不會導致IDE中的任何輸出。

應用程序的結構是這樣的:

def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName(sparkAppName).setMaster(sparkMaster) 
     .set("spark.local.dir", sparkLocalDir) 
     .set("spark.driver.allowMultipleContexts", "true") 

    sparkConf.registerKryoClasses(Array(classOf[Span])) 
    sparkConf.registerKryoClasses(Array(classOf[Spans])) 
    sparkConf.registerKryoClasses(Array(classOf[java.util.Map[String, String]])) 

    val trainingSsc = new StreamingContext(sparkConf, Seconds(batchInterval)) 
    trainingSsc.checkpoint(checkpoint) 
    //val predictor = (model, ninetynine, median, avg, max) 
    val result = trainKMeans(trainingSsc); 

    trainingSsc.stop(true, false) 

    val predictionSsc = new StreamingContext(sparkConf, Seconds(batchInterval)) 
    val threshold = result._5 
    val model = result._1 

    kMeansAnomalyDetection(predictionSsc, model, threshold) 
} 

我希望你能指點我犯的錯 - 如果你需要進一步的細節只是讓我知道。任何幫助和提示都非常感謝。

回答

1

一般來說,程序看起來像它會在正確的方向,但也有需要修正幾點:在發出streamingContext.start()

星火流將開始流調度。 DStream操作只能由調度程序執行。這意味着排序這兩個調用將不會有任何結果:

val result = trainKMeans(trainingSsc); 
trainingSsc.stop(true, false) 

在任何培訓發生之前流式上下文將停止。

相反,我們應該這樣做:

val result = trainKMeans(trainingSsc) 
trainingSsc.foreachRDD{_ => trainingSsc.stop(false, false) } // note that we don't stop the spark context here 
trainingSsc.start() 
trainingSsc.awaitTermination() 

在這種情況下,我們開始流過程;我們讓第一個區間執行,其中我們的模型將被訓練,然後我們停止處理。

第二流應該在不同的一組比第一個(卡夫卡流創建未在代碼段示出)

對於第二個流上下文啓動,我們缺少開始:

val predictionSsc = new StreamingContext(sparkContext, Seconds(batchInterval)) // note that we pass a sparkContext here, not a config. We reuse the same spark context. 
val threshold = result._5 
val model = result._1 
kMeansAnomalyDetection(predictionSsc, model, threshold) 
predictionSsc.start() 
predictionSsc.awaitTermination() 

在這一點上我們應該有一個工作流。

+0

我真的忘了添加predictions.start() - 這就是爲什麼預測沒有開始。我將評估是否有可能將開始的訓練放在主要方法中 - 但是因爲在上下文停止後我需要對提取的RDD進行一些處理,所以我不太確定如何實現這一點。但是,您的建議看起來像我應該實現的完美結構。 – LST