12
我有一個火花流上下文從卡夫卡讀取事件數據,間隔10秒。我想用postgres表中現有的數據來補充這個事件數據。火花流多個來源,重新加載數據幀
我可以加載Postgres的表是這樣的:
val sqlContext = new SQLContext(sc)
val data = sqlContext.load("jdbc", Map(
"url" -> url,
"dbtable" -> query))
...
val broadcasted = sc.broadcast(data.collect())
後來我能過去是這樣的:
val db = sc.parallelize(data.value)
val dataset = stream_data.transform{ rdd => rdd.leftOuterJoin(db)}
我想保持當前的數據流運行,並且每隔6小時仍然重新載入此表。由於目前apache的火花不支持多個運行上下文,我怎麼能做到這一點?有什麼解決方法嗎?或者我每次需要重新加載數據時都需要重新啓動服務器?這似乎是這樣一個簡單的用例...:/
我也在尋找答案,你有沒有成功,@ user838681? –
當你重新載入postgres表時,你是否關心過去的kafka事件,或者你是否試圖從postgres發生的最新重載時間加入新的kafka數據? –
@HamelKothari 無需更新或重新處理過去的卡夫卡活動。當我更新SQL表時,我只是想將它用於來自Kafka的任何未來事件。 –