2017-03-12 139 views
0

我正在嘗試使用count()方法計算以下RDD元素。首先是如下:RDD [數組[字符串]]與RDD [字符串]的計數速度

scala> val data_wo_header=dropheader(data) 
data_wo_header: org.apache.spark.rdd.RDD[String] 

當我做這個數,我得到:

scala> data_wo_header.count() 
res1: Long = 20000263 

這種操作比較快,需時約26sec

現在我變換了RDD如下:

scala> val ratings_split = data_wo_header.map(line => line.split(",")).persist() 
ratings_split: org.apache.spark.rdd.RDD[Array[String]] 

scala> ratings_split.count() 
res2: Long = 20000263 

這個計算大約需要5分鐘。有人可以建議爲什麼讀數的時間如此顯着地增加了嗎? 的drop header功能看起來這只是下降的第一行:

def dropheader(data: RDD[String]): RDD[String] = { 
    data.mapPartitionsWithIndex((idx, lines) => { 
    if (idx == 0) { 
    lines.drop(1) 
    } 
    lines 
    }) 
    } 

data只是val data = sc.textFile(file, 2).cache()

回答

1

第二個明顯更長,因爲您不僅計數行,還將每行轉換爲一個字符串數組。

在沒有選項的情況下使用persist()意味着它使用MEMORY_ONLY,因此與使用cache()完全相同。

現在5分鐘似乎很昂貴,但它取決於您的配置(總內存,CPU),而且還取決於每行元素的數量。

正如Chobeat所說,您需要使用Spark UI進行調查。

0

好了,這是您更容易通過查看星火UI驗證,並查看採取階段更多時間。數據上的地圖可能需要一些時間才能覆蓋整個數據集,並解釋了減速。另外persist()可能會引入一些開銷,但我不確定。

我的建議是使用CSV數據源讀取該CSV(如果可以的話)。