2016-11-18 89 views
2

我有一雙-RDD與結構: [(鍵,[(TIMESTRING,值)]]Pyspark:在嵌套列表合併值

實施例:

[("key1", [("20161101", 23), ("20161101", 41), ("20161102", 66),...]), 
("key2", [("20161101", 86), ("20161101", 9), ("20161102", 11),...]) 
    ...] 

欲處理列表中爲每個關鍵,通過TIMESTRING分組和計算相同timestrings所有值的平均值所以上面的例子將成爲:

[("key1", [("20161101", 32), ..]), 
("key2", [("20161101", 47.5),...]) 
    ...] 

我很難找到一個解決辦法只是用一步完成Pyspark方法,是否可以或是否需要使用一些中間步驟?

回答

1

您可以定義一個函數:

from itertools import groupby 
import numpy as np 

def mapper(xs): 
    return [(k, np.mean([v[1] for v in vs])) for k, vs in groupby(sorted(xs), lambda x: x[0])] 

而且mapValues

rdd = sc.parallelize([ 
    ("key1", [("20161101", 23), ("20161101", 41), ("20161102", 66)]), 
    ("key2", [("20161101", 86), ("20161101", 9), ("20161102", 11)]) 
]) 

rdd.mapValues(mapper)