2015-10-02 76 views
2

我運行一個流的工作,要逐步建立查找地圖(追蹤獨特的物品,例如過濾重複的進帳),起初我想,以保持在高速緩存中一個DataFrame,而在每個批次創建新DataFrame,像這樣的工會就如何懶洋洋地建立從火花流數據的高速緩存

items.foreachRDD((rdd: RDD[String]) => { 
    ... 
     val uf = rdd.toDF 
     cached_df = cached_df.unionAll(uf) 
     cached_df.cache 
     cached_df.count // materialize the 
    ... 
    }) 

我擔心的是,cached_df似乎記住所有的譜系以前從每批次反覆追加RDD S,在我的情況下,如果我不」如果它崩潰,那麼請不要重新計算這個緩存的RDD,那是維護日益增長的DAG的開銷嗎?

作爲替代方案,在每個批次的開始時,我加載查找從拼花文件,而不是保持它在存儲器中,然後在每一批結束時,我追加新RDD到相同的鑲木地板的文件:

noDuplicatedDF.write.mode(SaveMode.Append).parquet("lookup") 

可正常工作,但那裏保持查找內存中直接的方式?

感謝 萬春

回答

1

追加到實木複合地板絕對是正確的做法。但是,您可以優化查找。如果內存緩存稍微延遲(即沒有最新的第二個數據),那麼你可以週期性地(比如說每5分鐘)在內存中加載當前的「lookup」parquet表(假設它適合)。所有查詢查詢將查找最新的5分鐘快照。

你也可以管道裝載到內存和在不同的線程的查詢服務。

+0

是的,我每滑動間隔加載查找文件一次。如來,非常感謝! – Wanchun

+0

@tathagata能否請你幫我理解這個問題'的http:// stackoverflow.com /問題/ 40153442 /喬布斯是,排隊等候的功能於sparkstreaming' – Naresh