2016-12-09 16 views
0

我正在使用PySpark,並且正在尋找一種使用多次groupByKey/mapValues方法的方法。Pyspark在一行中多次使用groupByKey/mapValues

考慮:

rdd = sc.parallelize([(u'04896f3765094732a478ba63dd42c785', 
         u'2016-01-01', 
         u'2', 
         u'1404.0', 
         u'2016-333', 
         u'2016-48', 
         u'2016-11', 
         '2016-11-28'), 

         (u'kldmm584753dljkdhggdklkfj32a478ba63dd422574', 
         u'2016-01-14', 
         u'6', 
         u'2000.0', 
         u'2016-333', 
         u'2016-48', 
         u'2016-11', 
         '2016-11-28') 

        ]) 

我想我的組由4單元RDD( '2016-333' 在這裏),並獲得LEN,金額等。 我的代碼:

(rdd 
.map(lambda x : (x[4], x[0])) 
.groupByKey() 
.mapValues(len) 
.collect()) 

Output : [(u'2016-333', 2)] 

(rdd 
.map(lambda x : (x[4], float(x[3]))) 
.groupByKey() 
.mapValues(sum) 
.collect()) 

Output : [(u'2016-333', 3404.0)] 

(rdd 
.map(lambda x : (x[4], int(x[2]))) 
.groupByKey() 
.mapValues(sum) 
.collect()) 

Output : [(u'2016-333', 8)] 

我的問題:有辦法一次完成這個嗎? 預計產量爲:

[(u'2016-333', 2, 3404.0, 8)] 

Thx!

回答

3

您可以使用reduceByKey作爲wordcount example。在這裏,你的值是一個三部分元組,你的縮減器將是元素的總和。

rdd.map(lambda x: (x[4], (1, float(x[3]), int(x[2])))).reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])).collect() 
0

最簡單的:

rdd.map(lambda x: (x[4], float(x[3]), int(x[2]))).toDF(["key", "x3", "x2"]) \ 
    .groupBy("key").agg({"*": "count", "x3": "sum", "x2": "sum"}).rdd 

rdd.map(lambda x: (x[4], np.array([1, float(x[3]), int(x[2])]))) \ 
    .reduceByKey(lambda x, y: x + y) \ 
    .mapValues(lambda x: (int(x[0]) , int(x[1]), x[2])) 
+0

LostInOverflow Thx!但是如果不使用DataFrames,沒有辦法做到這一點? (只有rdds) – DataAddicted