2015-07-20 112 views
3

我有一個RDD,它是通過讀取一個大小爲117MB的本地文本文件形成的。高速緩存後「採取」行動RDD導致只有2%高速緩存

scala> rdd 
res87: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:24 

我緩存RDD: -

scala> rdd.persist() 
res84: rdd.type = MapPartitionsRDD[3] at textFile at <console>:24 

這之後我稱之爲 '取(1)' 行動對RDD強制evalulation。一旦完成,我檢查Spark UI的存儲頁面。它顯示緩存的分數僅爲2%,內存大小爲6.5MB。然後我在RDD上調用'count'的操作。在此之後,當我檢查Spark UI存儲頁面時,我突然發現這些數字現在已經改變。緩存的分數爲82%,內存大小爲258.2MB。這是否意味着即使在緩存RDD之後,Spark也只會緩存後續操作真正需要的內容(因爲需要(1)只讀取一個頂部元素)?當第二個動作「count」被觸發時,它需要觸摸所有元素,所以它最終會緩存剩餘的部分?我還沒有遇到過這樣的記錄行爲,這是一個錯誤嗎?

回答

1

根據源代碼,你是對的。 RDD引用僅保存到持久RDD的HashMap中,並在調用persist()時使用特殊清除器進行註冊。所以在實際讀取數據期間執行高速緩存。更多的可以被置換(例如,當沒有足夠的內存和對數據的有效引用不存在時)。

+0

我還是很困惑。我明白緩存是在觸發動作時執行的,但'take'也是一個動作,即使take(1)只讀取第一個元素,但在技術上仍然是一個動作。這些文檔並沒有提到這種稱爲部分緩存的事情。只要觸發了任何操作,緩存就會發生。另外,如果我們要說它正在緩存任何正在讀的內容,那聽起來也不對,因爲第一個元素只是一行文本(遠小於1KB)。它不需要6.5MB來存儲它。 – Dhiraj

+0

1行是你想要的,但hdfs在大塊中存儲數據(以MB爲單位)。當你問一條線時,無論如何都要讀取整個塊。當你問兩條線時,會讀取1或2個塊(線位置可能在同一個塊中或不同)。 – Zernike

+0

但是這個例子我在我的本地系統模式下嘗試過Scala shell。所以使用的文件系統是本地文件系統。 – Dhiraj

1

Spark僅根據需要實現rdds,即響應上一個答案中提到的操作。大多數操作都需要讀取rdd的所有分區,例如我們的count(),但其他操作不需要實現所有分區,也不需要其他性能原因。 take(x)first(),這實質上是take(1),是這樣的動作的例子。想象一下你有一個擁有數百萬記錄和許多分區的rdd,你只需要通過take(x)來檢查幾條記錄。實現整個rdd將是浪費。相反,Spark實現了一個分區並檢查了它包含的項目數量。基於這個數字,它實現了更多的分區來滿足take(x)的需求(我在這裏簡化了take(x)的邏輯)。

在你的情況下,take(1)需要一個分區,所以只有一個分區被物化和緩存。然後,當你做一個count()所有的分區需要實現並緩存到可用內存允許的範圍內。