2015-05-13 46 views
12

我有一個火花流上下文從卡夫卡讀取事件數據,間隔10秒。我想用postgres表中現有的數據來補充這個事件數據。火花流多個來源,重新加載數據幀

我可以加載Postgres的表是這樣的:

val sqlContext = new SQLContext(sc) 
val data = sqlContext.load("jdbc", Map(
    "url" -> url, 
    "dbtable" -> query)) 

...

val broadcasted = sc.broadcast(data.collect()) 

後來我能過去是這樣的:

val db = sc.parallelize(data.value) 
val dataset = stream_data.transform{ rdd => rdd.leftOuterJoin(db)} 

我想保持當前的數據流運行,並且每隔6小時仍然重新載入此表。由於目前apache的火花不支持多個運行上下文,我怎麼能做到這一點?有什麼解決方法嗎?或者我每次需要重新加載數據時都需要重新啓動服務器?這似乎是這樣一個簡單的用例...:/

+0

我也在尋找答案,你有沒有成功,@ user838681? –

+0

當你重新載入postgres表時,你是否關心過去的kafka事件,或者你是否試圖從postgres發生的最新重載時間加入新的kafka數據? –

+0

@HamelKothari 無需更新或重新處理過去的卡夫卡活動。當我更新SQL表時,我只是想將它用於來自Kafka的任何未來事件。 –

回答

1

在我的愚見,在DStreams轉換過程中重新加載另一個數據源是不推薦的設計。

相比傳統stateful流處理模型,d流被設計爲構造一個流計算爲一系列stateless,小的時間間隔deterministic批次計算。

DStreams上的轉換是確定性的,這種設計使得通過重新計算可以快速從故障中恢復。清爽會對恢復/重新計算帶來副作用。

一種解決方法是推遲查詢以輸出操作,例如:foreachRDD(func)