我在3個節點上有一個分佈式系統,我的數據分佈在這些節點中。例如,我有一個test.csv
文件,它存在於所有3個節點,它包含的如何在Apache Spark中聚合數據
row | id, C1, C2, C3
----------------------
row1 | A1 , c1 , c2 ,2
row2 | A1 , c1 , c2 ,1
row3 | A1 , c11, c2 ,1
row4 | A2 , c1 , c2 ,1
row5 | A2 , c1 , c2 ,1
row6 | A2 , c11, c2 ,1
row7 | A2 , c11, c21,1
row8 | A3 , c1 , c2 ,1
row9 | A3 , c1 , c2 ,2
row10 | A4 , c1 , c2 ,1
4列我想嘗試彙總上述結果集。如何彙總由id
,c1
,c2
和c3
列設置的數據並將其輸出爲這樣?
row | id, C1, C2, C3
----------------------
row1 | A1 , c1 , c2 ,3
row2 | A1 , c11, c2 ,1
row3 | A2 , c1 , c2 ,2
row4 | A2 , c11, c2 ,1
row5 | A2 , c11, c21,1
row6 | A3 , c1 , c2 ,3
row7 | A4 , c1 , c2 ,1
我試過如下:
from array import array
from datetime import datetime
import pyspark.sql
from pyspark.sql import Row, SQLContext, StructField, StringType, IntegerType
schema = StructType([
StructField("id", StringType(), False),
StructField("C1", StringType(), False),
StructField("C2", StringType(), False),
StructField("C3", IntegerType(), False)])
base_rdd = sc.textFile("/home/hduser/spark-1.1.0/Data/test.tsv").map(lambda l:
l.split(",")
rdd = base_rdd.map(lambda x: Row(id = x[0], C1 = x[1], C2 = x[2], C3 = int(x[3])))
sqlContext = SQLContext(sc)
srdd = sqlContext.inferSchema(rdd)
您能否顯示錯誤? – 2015-06-16 05:45:11