2
我運行一個流的工作,要逐步建立查找地圖(追蹤獨特的物品,例如過濾重複的進帳),起初我想,以保持在高速緩存中一個DataFrame
,而在每個批次創建新DataFrame
,像這樣的工會就如何懶洋洋地建立從火花流數據的高速緩存
items.foreachRDD((rdd: RDD[String]) => {
...
val uf = rdd.toDF
cached_df = cached_df.unionAll(uf)
cached_df.cache
cached_df.count // materialize the
...
})
我擔心的是,cached_df似乎記住所有的譜系以前從每批次反覆追加RDD
S,在我的情況下,如果我不」如果它崩潰,那麼請不要重新計算這個緩存的RDD
,那是維護日益增長的DAG的開銷嗎?
作爲替代方案,在每個批次的開始時,我加載查找從拼花文件,而不是保持它在存儲器中,然後在每一批結束時,我追加新RDD
到相同的鑲木地板的文件:
noDuplicatedDF.write.mode(SaveMode.Append).parquet("lookup")
可正常工作,但那裏保持查找內存中直接的方式?
感謝 萬春
是的,我每滑動間隔加載查找文件一次。如來,非常感謝! – Wanchun
@tathagata能否請你幫我理解這個問題'的http:// stackoverflow.com /問題/ 40153442 /喬布斯是,排隊等候的功能於sparkstreaming' – Naresh