2016-08-20 60 views
0

我的問題與PySpark reduceByKey on multiple values類似,但有一點關鍵的區別。我是PySpark的新手,所以我肯定錯過了一些明顯的東西。Pyspark在嵌套元組上reduceByKey

我有以下結構的RDD:

(K0, ((k01,v01), (k02,v02), ...)) 
.... 
(Kn, ((kn1,vn1), (kn2,vn2), ...)) 

我想作爲一個輸出是一樣的東西

(K0, v01+v02+...) 
... 
(Kn, vn1+vn2+...) 

這似乎像是一個完美的情況下使用reduceByKey,起初我還以爲類似的東西

rdd.reduceByKey(lambda x,y: x[1]+y[1]) 

這給了我完全RDD我開始用。我想我的索引有問題,因爲有嵌套元組,但我嘗試了所有可能的索引組合,我一直在想,並且一直給我回初始的RDD。

是否有理由不應該使用嵌套元組或者我做錯了什麼?

回答

0

這裏根本不應該使用reduceByKey。它需要一個具有簽名的關聯和交換函數。 (T, T) => T。很顯然,當您將List[Tuple[U, T]]作爲輸入並且您期望T作爲輸出時,它不適用。

由於不清楚是否鍵或唯一或不讓我們考慮一般的例子,當我們必須在本地和全球聚合。讓我們假設v01v02,... vm是簡單的數學運算:

from functools import reduce 
from operator import add 

def agg_(xs): 
    # For numeric values sum would be more idiomatic 
    # but lets make it more generic 
    return reduce(add, (x[1] for x in xs), zero_value) 

zero_value = 0 
merge_op = add 
def seq_op(acc, xs): 
    return acc + agg_(xs) 

rdd = sc.parallelize([ 
    ("K0", (("k01", 3), ("k02", 2))), 
    ("K0", (("k03", 5), ("k04", 6))), 
    ("K1", (("k11", 0), ("k12", -1)))]) 

rdd.aggregateByKey(0, seq_op, merge_op).take(2) 
## [('K0', 16), ('K1', -1)] 

如果項已獨特的簡單mapValues就足夠了:

from itertools import chain 

unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x))) 
unique_keys.mapValues(agg_).take(2) 
## [('K0', 16), ('K1', -1)] 
+0

那現在我很清楚。是的,鍵是唯一的,所以mapValues方法就是我所需要的。非常感謝你。 –