我正在嘗試在火花流中實現兩個階段的過程。首先,我打開一個kafkaStream,使用auto.offset.reset=earliest
讀取主題中已有的所有內容,並在其上訓練我的模型。 我使用流,因爲我無法找到如何做到這一點,而無需打開流(Spark - Get earliest and latest offset of Kafka without opening stream)。 因爲我還沒有發現一種方法來停止流而不停止整個StreamingContext,所以我使用ssc.stop(true, true)
停止模型計算後的上下文。用不同的StreamingContext彼此打開兩個KafkaStreams
當我現在嘗試創建一個新的StreamingContext(使用舊的sparkConfig或具有相同參數的新sparkConfig)時,請調用我的方法以new groupId打開一個新的KafkaStream,並且看起來好像沒有流式發生當我爲卡夫卡話題寫新內容時。 forEachRDD中的print(),count()和println都不會導致IDE中的任何輸出。
應用程序的結構是這樣的:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName(sparkAppName).setMaster(sparkMaster)
.set("spark.local.dir", sparkLocalDir)
.set("spark.driver.allowMultipleContexts", "true")
sparkConf.registerKryoClasses(Array(classOf[Span]))
sparkConf.registerKryoClasses(Array(classOf[Spans]))
sparkConf.registerKryoClasses(Array(classOf[java.util.Map[String, String]]))
val trainingSsc = new StreamingContext(sparkConf, Seconds(batchInterval))
trainingSsc.checkpoint(checkpoint)
//val predictor = (model, ninetynine, median, avg, max)
val result = trainKMeans(trainingSsc);
trainingSsc.stop(true, false)
val predictionSsc = new StreamingContext(sparkConf, Seconds(batchInterval))
val threshold = result._5
val model = result._1
kMeansAnomalyDetection(predictionSsc, model, threshold)
}
我希望你能指點我犯的錯 - 如果你需要進一步的細節只是讓我知道。任何幫助和提示都非常感謝。
我真的忘了添加predictions.start() - 這就是爲什麼預測沒有開始。我將評估是否有可能將開始的訓練放在主要方法中 - 但是因爲在上下文停止後我需要對提取的RDD進行一些處理,所以我不太確定如何實現這一點。但是,您的建議看起來像我應該實現的完美結構。 – LST