我有一個非常長(幾十億行)和寬得多(幾百列)的RDD。我想創建每列中唯一值的集合(這些集合不需要並行化,因爲每列中包含的值不會超過500個)。在PySpark中爲RDD中的每列找到不同的值
這是我到目前爲止有:
data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]])
num_columns = len(data.first())
empty_sets = [set() for index in xrange(num_columns)]
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y)))
我在做什麼這裏正試圖initate的空集列表,一個在我的RDD每一列。對於聚合的第一部分,我想通過data
逐行進行迭代,將列n
中的值添加到集列表中的第n
集。如果這個值已經存在,它就不會執行任何操作。然後,它在之後執行集合的union
,以便在所有分區中只返回不同的值。
當我嘗試運行這段代碼,我得到以下錯誤:
AttributeError: 'list' object has no attribute 'add'
我認爲問題是,我無法準確,清楚,我通過集列表迭代( empty_sets
),並且我遍歷data
中每行的列。我相信在(lambda a, b: a.add(b))
那a
是empty_sets
和b
是data.first()
(整行,不是一個單一的值)。這顯然不起作用,並不是我想要的聚合。
如何遍歷我的列表集合,並通過我的數據框的每一行,將每個值添加到其對應的set對象?
所需的輸出將如下所示:
[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]
PS我看了這個例子here,這是非常相似,我的使用情況下(這是在那裏我得到了主意,用首先是aggregate
)。但是,我發現代碼很難轉換成PySpark,我很不清楚代碼正在做什麼case
和zip
。