我正在使用Spark(在Scala中)來讀取包含用戶和頁面共享列表的文件,並且我想查找與給定用戶相距一定距離的所有用戶通過他們共享的頁面。處理大量輸入時Spark性能非常低
程序運行非常糟糕,而且經常出現GC overhead limit exceeded
錯誤。
我在Mac OSX上使用8 GB的RAM在本地運行Spark。程序使用提交,參數--driver-memory 5g
和8個核心通過設置spark.cores.max
分配。輸入集是1.15 GB的文件。
有沒有人有跡象表明哪種操作效率很低,如果有更好的替代方案?
在此先感謝。
代碼在這裏簡要描述。
每個用戶條目包含他/她一個選項卡後,共享頁面,每個條目由兩個換行,像這樣分離:
John Doe <tab> Page 1
<tab> Page 2
<tab> Page 3
User 2 <tab> ...
首先,我讀使用newAPIHadoopFile
輸入文件。
val hdpConf = new Configuration(sc.hadoopConfiguration)
hdpConf.set("textinputformat.record.delimiter", "\n\n")
val hadoopFile = sc.newAPIHadoopFile("user_pages.list", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hdpConf)
現在我把它變成對(user, Array(pagesShared))
像這樣
val pagesPerUser = hadoopFile.map {
line =>
val line_splitted = line._2.toString.split("\t");
(line_splitted(0), line_splitted.drop(1).mkString.split("\n"))
}
然後我創建一個包含單(k,v)
雙爲每一位用戶和頁面組合(page, user)
的RDD。
val pageAndUser = pagesPerUser.flatMap(line => line._2.map(page => (line._1, page)))
.map(...)
.filter(...)
的map
使用replaceAll
過濾網頁標題,和filter
去除含有包含引號以及使用matches()
,以檢查是否滿足標題一些多個標準正則表達式某些標題任何條目。
然後,我創建直接鏈接到另一個用戶(user, user)
的每個用戶對,然後將其轉換爲格式爲(user, Array(user))
(包含所有直接連接的用戶通過共享相同頁面)的RDD。
val pageAndUsers = pageAndUser.groupByKey.mapValues(_.toArray)
.map(line => line._2)
val commonUsers = pageAndUsers.flatMap(users => users.map(user => (user, users)))
.reduceByKey(_ ++ _).cache()
.map(users => (users._1, users._2.distinct))
這RDD然後可以用於更進一步確定用戶之間的距離,但我覺得性能下降,主要是在這些地區之一。
Spark UI顯示在確定commonUsers
時,程序似乎在reduceByKey
和map
步驟中執行緩慢。我確定它的表現是慢慢地通過與同伴程序員的解決方案進行比較。此外,我經常得到一個GC溢出/堆空間超過錯誤,這將表明我的代碼中發生了一些內存泄漏。
編輯: 經過一番深入調查,我敢肯定問題出在reduceByKey(_++_)
一步。我嘗試使用groupByKey
代替,但程序似乎在我身上失敗,並且每次都在該特定點上崩潰。
首先,請告訴您的配置和你是如何提供它和你在集羣模式或客戶端模式下使用呢? –
我在Mac OSX上使用8 GB的RAM在本地運行Spark。程序使用'spark-submit'提交,參數爲'--driver-memory 5g',通過設置'spark.cores.max'分配8個內核。 – Laurens
什麼是慢?比較什麼?哪些階段表現不佳? (你可以在火花UI頁面看到) – maasg