2015-11-09 70 views
1

我有一個成員列表,其中有許多屬性,其中兩個是名稱和ID。我希望獲得RDD中的元組列表。元組將包含作爲第一個元素的ID,並將包含與該ID關聯的unique個數的計數作爲第二個元素。爲每個密鑰計算唯一值的有效方法

例如如:ID, <# of unique names associated with ID>

這裏是我寫來完成這個代碼:

IDnametuple = members.map(lambda a: (a.ID, a.name)) # extract only ID and name 
idnamelist = IDnametuple.groupByKey()     # group the IDs together 
idnameunique_count = (idnamelist 
    # set(tup[1]) should extract unique elements, 
    # and len should tell the number of them 
    .map(lambda tup: (tup[0], len(set(tup[1]))))) 

這是令人難以置信的慢,比數爲每個成員的獨特屬性相似的操作慢得多。

有沒有更快的方法來做到這一點?我嘗試儘可能多地使用內置插件,這是從我聽到的內容中加速實現的正確方法。

回答

3

沒有任何細節,我們只能猜測,但明顯的選擇是groupByKey。如果每個ID都與大量名稱相關聯,則由於廣泛的洗牌而可能非常昂貴。最簡單的改進是aggregateByKeycombineByKey

create_combiner = set 

def merge_value(acc, x): 
    acc.add(x) 
    return acc 

def merge_combiners(acc1, acc2): 
    acc1.update(acc2) 
    return acc1 

id_name_unique_count = (id_name_tuple # Keep consistent naming convention 
    .combineByKey(create_combiner, merge_value, merge_combiners) 
    .mapValues(len)) 

如果唯一值的預期數量很大,你可能更喜歡以取代近似精確的方法。一種可能的方法是使用Bloom過濾器來跟蹤唯一值而不是set

有關其他信息有關groupByKey VS aggregateByKeyreduceByKeycombineByKey)見:

1

這基本上是單詞計數示例的https://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs,但是計數不同鍵值對:

from operator import add 
IDnametuple = sc.parallelize([(0, "a"),(0, "a"),(0, "b"),(1, "a"),(1, "b"),(1, "c"),(2, "z")]) 
idnameunique_count = (IDnametuple.distinct() 
            .map(lambda idName : (idName[0], 1)) 
            .reduceByKey(add)) 

所以idnameunique_count.collect()返回[(0, 2), (1, 3), (2, 1)]其中(0, "a")只計算一次。正如@ zero323所提到的,這裏的關鍵是用reduceByKey替代groupByKey,以避免創建中間名稱列表。所有你需要的是名字計數,這是一個小得多的對象,可能是一個巨大的名單。此外,您的版本使用set()在封閉代碼中順序排除重複項,而distinct作爲分佈式並行RDD轉換執行。

+0

這種方法有一個問題 - 它必須洗牌兩次。一旦獲得'distinct'值,並且一次獲得'reduceByKey'。關於並行性......除非密鑰的數量與可用內核的數量相當,否則當您對分組數據使用set時完全相同。按順序處理每個分區。 – zero323

+0

通過使用集合作爲累加器,您只能處理數據集,其中某個鍵的唯一值集適合單個工作人員的內存。使用我的解決方案,您沒有這個限制,因爲您只爲每個密鑰存儲一個數字,如「避免GroupByKey」中所述。您的解決方案基本上是使用set重新實現groupByKey。我不明白你對密鑰和核心之間關係的評論。 – juanrh0011

+0

確實有這個限制。要執行'distinct',你必須'reduceByKey',只有在重複次數很大時才執行。在這種情況下'combineByKey'應該已經減少了數據量。否則,第一個潛在的失敗點是混洗和存儲'((k,v),null)對,它們必須適合內存。接下來進行另一次洗牌,平均需要再次洗牌大部分數據。這是你實際上通過計數獲得的部分。關於核心關係。 – zero323

相關問題