2016-03-02 51 views
1

,我有一個RDD它看起來像這樣計數使用<em>pyspark</em>

[("a", 0), ("b", 1), ("a", 1), ("a", 0)] 

我希望做的是建立另一個RDD與計數第一個字段基於第三個字段。這樣有效地將是:

[("a", 0, 2), ("a", 1, 1), ("b", 1, 1)] 

這意味着有兩個實例「一」與第三字段等於0,且存在的一個實例「A」與第三字段等於1,並且有一個實例「b」與第三字段等於1

的我可以很容易地通過使用reduceByKey作爲

RDD = sc.parallelize([(「一」獲得第一場的不同計數0,2),(「a」,1,1),(「b」,1,1)])

.MAP(拉姆達行:(行[0],1))

.reduceByKey(添加)

但這隻會給我的 「A」 和 「B」 的計數不管第三場。我將如何獲得這個呢?

回答

2

如果將其理解你的問題很好,你很可能在尋找這樣的事情:

from operator import add 

rdd = sc.parallelize([("a", 0), ("b", 1), ("a", 1), ("a", 0)]) 
     .map(lambda row: ((row[0],row[1]), 1)) 
     .reduceByKey(add) 
     .map(lambda row : (row[0][0],row[0][1],row[1])) 
print(rdd.collect()) 

# [('a', 1, 1), ('a', 0, 2), ('b', 1, 1)]