2016-04-26 38 views
1

我有一個非常長(幾十億行)和寬得多(幾百列)的RDD。我想創建每列中唯一值的集合(這些集合不需要並行化,因爲每列中包含的值不會超過500個)。在PySpark中爲RDD中的每列找到不同的值

這是我到目前爲止有:

data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]]) 
num_columns = len(data.first()) 
empty_sets = [set() for index in xrange(num_columns)] 
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y))) 

我在做什麼這裏正試圖initate的空集列表,一個在我的RDD每一列。對於聚合的第一部分,我想通過data逐行進行迭代,將列n中的值添加到集列表中的第n集。如果這個值已經存在,它就不會執行任何操作。然後,它在之後執行集合的union,以便在所有分區中只返回不同的值。

當我嘗試運行這段代碼,我得到以下錯誤:

AttributeError: 'list' object has no attribute 'add'

我認爲問題是,我無法準確,清楚,我通過集列表迭代( empty_sets),並且我遍歷data中每行的列。我相信在(lambda a, b: a.add(b))aempty_setsbdata.first()(整行,不是一個單一的值)。這顯然不起作用,並不是我想要的聚合。

如何遍歷我的列表集合,並通過我的數據框的每一行,將每個值添加到其對應的set對象?

所需的輸出將如下所示:

[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]


PS我看了這個例子here,這是非常相似,我的使用情況下(這是在那裏我得到了主意,用首先是aggregate)。但是,我發現代碼很難轉換成PySpark,我很不清楚代碼正在做什麼casezip

回答

1

有兩個問題。一,你的組合函數假設每一行都是一個集合,但是你正在一組集合上進行操作。兩個,add不返回任何東西(嘗試a = set(); b = a.add('1'); print b),所以你的第一個組合函數返回一個列表None s。要解決這個問題,要使你的第一個組合器函數不是匿名的,並讓它們都循環遍歷集合列表:

def set_plus_row(sets, row): 
    for i in range(len(sets)): 
     sets[i].add(row[i]) 
    return sets 


unique_values_per_column = data.aggregate(
    empty_sets, 
    set_plus_row, # can't be lambda b/c add doesn't return anything 
    lambda x, y: [a.union(b) for a, b in zip(x, y)] 
) 

我不知道什麼zip確實在Scala中,但在Python,它需要兩個列表,並把每個對應元素一起放入元組(試行x = [1, 2, 3]; y = ['a', 'b', 'c']; print zip(x, y);),這樣你可以遍歷同時兩個列表。