2016-10-23 34 views
0

我正在使用Spark(在Scala中)來讀取包含用戶和頁面共享列表的文件,並且我想查找與給定用戶相距一定距離的所有用戶通過他們共享的頁面。處理大量輸入時Spark性能非常低

程序運行非常糟糕,而且經常出現GC overhead limit exceeded錯誤。

我在Mac OSX上使用8 GB的RAM在本地運行Spark。程序使用​​提交,參數--driver-memory 5g和8個核心通過設置spark.cores.max分配。輸入集是1.15 GB的文件。

有沒有人有跡象表明哪種操作效率很低,如果有更好的替代方案?

在此先感謝。

代碼在這裏簡要描述。

每個用戶條目包含他/她一個選項卡後,共享頁面,每個條目由兩個換行,像這樣分離:

John Doe <tab> Page 1 
      <tab> Page 2 
      <tab> Page 3 

User 2  <tab> ... 

首先,我讀使用newAPIHadoopFile輸入文件。

val hdpConf = new Configuration(sc.hadoopConfiguration) 
hdpConf.set("textinputformat.record.delimiter", "\n\n") 
val hadoopFile = sc.newAPIHadoopFile("user_pages.list", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hdpConf) 

現在我把它變成對(user, Array(pagesShared))像這樣

val pagesPerUser = hadoopFile.map { 
    line => 
     val line_splitted = line._2.toString.split("\t"); 
     (line_splitted(0), line_splitted.drop(1).mkString.split("\n")) 
} 

然後我創建一個包含單(k,v)雙爲每一位用戶和頁面組合(page, user)的RDD。

val pageAndUser = pagesPerUser.flatMap(line => line._2.map(page => (line._1, page))) 
    .map(...) 
    .filter(...) 

map使用replaceAll過濾網頁標題,和filter去除含有包含引號以及使用matches(),以檢查是否滿足標題一些多個標準正則表達式某些標題任何條目。

然後,我創建直接鏈接到另一個用戶(user, user)的每個用戶對,然後將其轉換爲格式爲(user, Array(user))(包含所有直接連接的用戶通過共享相同頁面)的RDD。

val pageAndUsers = pageAndUser.groupByKey.mapValues(_.toArray) 
    .map(line => line._2) 
val commonUsers = pageAndUsers.flatMap(users => users.map(user => (user, users))) 
    .reduceByKey(_ ++ _).cache() 
    .map(users => (users._1, users._2.distinct)) 

這RDD然後可以用於更進一步確定用戶之間的距離,但我覺得性能下降,主要是在這些地區之一。

Spark UI顯示在確定commonUsers時,程序似乎在reduceByKeymap步驟中執行緩慢。我確定它的表現是慢慢地通過與同伴程序員的解決方案進行比較。此外,我經常得到一個GC溢出/堆空間超過錯誤,這將表明我的代碼中發生了一些內存泄漏。

編輯: 經過一番深入調查,我敢肯定問題出在reduceByKey(_++_)一步。我嘗試使用groupByKey代替,但程序似乎在我身上失敗,並且每次都在該特定點上崩潰。

+0

首先,請告訴您的配置和你是如何提供它和你在集羣模式或客戶端模式下使用呢? –

+0

我在Mac OSX上使用8 GB的RAM在本地運行Spark。程序使用'spark-submit'提交,參數爲'--driver-memory 5g',通過設置'spark.cores.max'分配8個內核。 – Laurens

+3

什麼是慢?比較什麼?哪些階段表現不佳? (你可以在火花UI頁面看到) – maasg

回答

1

執行reduceByKey並使用它來組合可能增長到無限大小的數據是很危險的。例如,它看起來好像是鏈接在某種意義上共享頁面的用戶。但是如果你的一個用戶與所有其他用戶相關聯,該怎麼辦?然後,你試圖在你的reduceByKey中構造的數組會變得非常大。這是你的記憶和GC問題的根源。

我期待如果你在這個舞臺運行時看着Spark UI,你會看到一些掛起的任務。這些將是你有一個單一的用戶鏈接到許多用戶。 (也許所有用戶都會掛在這種情況下,所有用戶都鏈接到所有用戶)。

我會在您的reduceByKey(「pageAndUsers」RDD)之前保存您的數據,然後查詢該數據以查看發生了什麼。

也許如果你總共有一小部分用戶,你可以使用一個集合而不是一個數組,因爲這會隨着它的變化自動地將你的用戶的價值「分開」,所以它可能不會增長太大(取決於你的數據)。

但是,您需要查看數據以瞭解問題。使用我只是在這裏提到的一些例子(不完全快)代碼集的邏輯:

val pageAndUsers = pageAndUser.groupByKey.mapValues(_.toSet) 
    .map(line => line._2) 
val commonUsers = pageAndUsers.flatMap(users => users.map(user => (user, users))) 
    .reduceByKey(_ ++ _).cache() 
+0

正如你所建議的,我試着保存'pageAndUsers' RDD並執行'count'。 Spark GUI指出它的大小約爲1 GB,這對我來說看起來很好。當我對包含'reduceByKey'的步驟做同樣的處理時,我發現數據大小會大量增加並將數據泄漏到磁盤。所以'reduceByKey'確實會產生大量的數據。什麼是改善其性能的好方法? – Laurens

+0

它不是一個真正的性能問題。如果你離開它幾天,它最終會因爲花費在GC上的所有時間而失敗。處理一個非常大的分區非常挑剔,取決於你想要的重要性以及你的數據是什麼樣的。現在,我會查詢您的數據以找出「不良」網頁,然後在您的過濾器中將其過濾掉。很可能你不希望每個用戶都鏈接到每一個用戶,因爲這個頁面的原因很多。 –