2017-02-16 70 views
0

我已經將數據設置,與aggregateByKey PySpark平方和的

a = sc.parallelize([((1,2),(10,20,[1,3])),((1,2),(30,40,[1]))])

和我需要:

  • (1,2)將是關鍵
  • 由於我想計算前兩個值的流標準偏差,因此我需要爲這些值中的每個值計算純和和平方和。換句話說,我需要對第一個值 和 sumx =(10 + 30),sumx^2 =(10^2 + 30^2)sumx =(20 + 40),sumx^2 =( 20^2 + 40^2)爲第二個值。
  • 爲最終值(列表),我只是想連接它們。

的最終結果必須是:

([(1,2),(40,1000,60,2000,[1,3])])

這裏是我的代碼:

a.aggregateByKey((0.0,0.0,0.0,0.0,[]),\ (lambda x,y: (x[0]+y[0],x[0]*x[0]+y[0]*y[0],x[1]+y[1],x[1]*x[1]+y[1]*y[1],x[2]+y[2])),\ (lambda rdd1,rdd2: (rdd1[0]+rdd2[0],rdd1[1]+rdd2[1],rdd1[2]+rdd1[2],rdd1[3]+rdd2[3],rdd1[4]+rdd2[4]))).collect()

不幸的是,返回以下錯誤: "TypeError: unsupported operand type(s) for +: 'float' and 'list'"

任何汝亮燈?

回答

0

您可以使用hivecontext來解決這個問題:

from pyspark.sql.context import HiveContext 
hivectx = HiveContext(sc) 

a = sc.parallelize([((1,2),(10,20,[1,3])),((1,2),(30,40,[1]))]) 

# Convert this to a dataframe 
b = a.toDF(['col1','col2']) 

# Explode col2 into individual columns 
c = b.map(lambda x: (x.col1,x.col2[0],x.col2[1],x.col2[2])).toDF(['col1','col21','col22','col23']) 

c.registerTempTable('mydf') 

sql = """ 
select col1, 
sum(col21) as sumcol21, 
sum(POW(col21,2)) as sum2col21, 
sum(col22) as sumcol22, 
sum(POW(col22,2)) as sum2col22, 
collect_set(col23) as col23 
from mydf 
group by col1 
""" 
d = hivectx.sql(sql) 

# Get back your original dataframe 
e = d.map(lambda x:(x.col1,(x.sumcol21,x.sum2col21,x.sumcol22,x.sum2col22,[item for sublist in x.col23 for item in sublist]))).toDF(['col1','col2'])