2015-04-23 70 views
3

我有以下數據,我想要做的是PySpark reduceByKey?添加鍵/元組

[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')] 

針對每一個琴鍵計值的情況下(1串字符)。所以,我首先做了一個地圖:

.map(lambda x: (x[0], [x[1], 1])) 

使現在的關鍵/元組:

[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', 1]), (13, ['A', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['A', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['X', 1])] 

我只是不能在最後一部分搞清楚那封信如何爲每個鍵計數的情況下, 。例如鍵13將有1 d和1個A.雖然14將有2度T的等

+1

你想第一個'groupByKey',然後在已分組的角色執行的計數。 – ohruunuruus

回答

3

我更熟悉斯卡拉星火,所以有可能會比Counter更好的方法來計算由groupByKey產生的迭代器角色,但這裏的一個選項:

from collections import Counter 

rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.groupByKey().mapValues(lambda x: Counter(x)).collect() 

[(48, Counter({'0': 2})), 
(32, Counter({'6': 2})), 
(49, Counter({'2': 2})), 
(50, Counter({'0': 2})), 
(51, Counter({'X': 1, 'T': 1})), 
(53, Counter({'2': 1})), 
(13, Counter({'A': 1, 'D': 1})), 
(45, Counter({'A': 1, 'T': 1})), 
(14, Counter({'T': 2})), 
(54, Counter({'0': 1})), 
(47, Counter({'2': 2}))] 
+2

哦,你已經使用了計數器!不幸的是,應該避免使用'groupByKey',因爲它彙集了master上的所有數據,而2個操作而不是1個操作都不夠用,但是1個代表緊湊! – ipoteka

+0

@ipoteka有趣的是我沒有了解「groupByKey」的低效率您是否有詳細闡述這個問題的良好參考? – ohruunuruus

+3

http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/ best_practices/prefer_reducebykey_over_groupbykey.html – ipoteka

3

如果我理解你的權利,你可以在一個操作combineByKey做到這一點:

from collections import Counter 
x = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
result = x.combineByKey(lambda value: {value: 1}, 
...      lambda x, value: value.get(x,0) + 1, 
...      lambda x, y: dict(Counter(x) + Counter(y))) 
result.collect() 
[(32, {'6': 2}), (48, {'0': 2}), (49, {'2': 2}), (53, {'2': 1}), (13, {'A': 1, 'D': 1}), (45, {'A': 1, 'T': 1}), (50, {'0': 2}), (54, {'0': 1}), (14, {'T': 2}), (51, {'X': 1, 'T': 1}), (47, {'2': 2})] 
+0

看起來像這個解決方案13有('A',2)而不是[('A',1),('D',1)] – ohruunuruus

+0

嗯,我假設13只對應於'A',我會改變我的答案。謝謝! – ipoteka

+0

OP需要爲每個鍵上的每個字符計數 – ohruunuruus

2

相反的:

.map(lambda x: (x[0], [x[1], 1])) 

我們可以這樣做:

.map(lambda x: ((x[0], x[1]), 1)) 

而在最後一步,我們可以使用reduceByKey添加。請注意,添加來自運營商包。

將其組合在一起:

from operator import add 
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).collect()