2014-10-07 41 views
3

我試圖在兩個Spark RDD上進行連接。我有一個鏈接到類別的交易日誌。我已經格式化了我的事務RDD,並將類別ID作爲關鍵字。Spark加速指數緩慢

transactions_cat.take(3) 
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])] 

categories.take(3) 
[(u'2202', 0), (u'3203', 0), (u'1726', 0)] 

事務日誌是大約20 GB(350百萬的行)。 類別列表小於1KB。

當我運行

transactions_cat.join(categories).count() 

星火開始很慢。我有一個有643個任務的階段。前10項任務約需1分鐘。然後每個任務變得越來越慢(約60分鐘左右)。我不確定有什麼問題。

請檢查這些截圖以獲得更好的主意。 enter image description here enter image description here enter image description here

我正在星火1.1.0與使用Python外殼50 GB的總內存4名工人。 計算交易RDD只是相當快(30分鐘)

回答

7

有什麼不對?可能是Spark沒有注意到你有一個簡單的連接問題。當你加入的兩個RDD之一是如此之小,你最好不要是RDD。然後你可以推出你自己的hash join實現,這實際上比聽起來簡單得多。基本上,你需要:

  • 類別列表出來使用collect()RDD的拉 - 所產生的通信會很容易地爲自己支付(或者,如果可能的話,不讓它的RDD擺在首位)
  • 把它變成一個哈希表中包含的所有值一個關鍵的一個條目(假設你的鑰匙是不是唯一的)
  • 對於每一對在​​大RDD,查找關鍵了在哈希表中,併產生一對對於列表中的每個值(如果未找到,則該特定對不會產生任何結果)

我有一個implementation in Scala - 隨意提出翻譯它的問題,但它應該很容易。

另一個有趣的可能性是嘗試使用Spark SQL。我很確定這個項目的長期目標是自動爲你做這件事,但我不知道他們是否已經實現了。