2014-10-18 130 views
0

我在3個節點上有一個分佈式系統,我的數據分佈在這些節點中。例如,我有一個test.csv文件,它存在於所有3個節點,它包含的如何在Apache Spark中聚合數據

row | id, C1, C2, C3 
---------------------- 
row1 | A1 , c1 , c2 ,2 
row2 | A1 , c1 , c2 ,1 
row3 | A1 , c11, c2 ,1 
row4 | A2 , c1 , c2 ,1 
row5 | A2 , c1 , c2 ,1 
row6 | A2 , c11, c2 ,1 
row7 | A2 , c11, c21,1 
row8 | A3 , c1 , c2 ,1 
row9 | A3 , c1 , c2 ,2 
row10 | A4 , c1 , c2 ,1 

4列我想嘗試彙總上述結果集。如何彙總由id,c1,c2c3列設置的數據並將其輸出爲這樣?

row | id, C1, C2, C3 
---------------------- 
row1 | A1 , c1 , c2 ,3 
row2 | A1 , c11, c2 ,1 
row3 | A2 , c1 , c2 ,2 
row4 | A2 , c11, c2 ,1 
row5 | A2 , c11, c21,1 
row6 | A3 , c1 , c2 ,3 
row7 | A4 , c1 , c2 ,1 

我試過如下:

from array import array 
from datetime import datetime 
import pyspark.sql 
from pyspark.sql import Row, SQLContext, StructField, StringType, IntegerType 

schema = StructType([ 
    StructField("id", StringType(), False), 
    StructField("C1", StringType(), False), 
    StructField("C2", StringType(), False), 
    StructField("C3", IntegerType(), False)]) 
base_rdd = sc.textFile("/home/hduser/spark-1.1.0/Data/test.tsv").map(lambda l: 

l.split(",") 

rdd = base_rdd.map(lambda x: Row(id = x[0], C1 = x[1], C2 = x[2], C3 = int(x[3]))) 
sqlContext = SQLContext(sc) 
srdd = sqlContext.inferSchema(rdd) 
+0

您能否顯示錯誤? – 2015-06-16 05:45:11

回答

0

解決你的問題,你可以做下面的步驟。我不知道python步驟,下面是java步驟。我希望你能把它和python聯繫起來。

  1. 讀csv文件

JavaRDD<String> input = sc.textFile(args[0]);

  • 創建一對從文件RDD

    JavaPairRDD<Integer,String> pairMap = input.mapToPair( new PairFunction<String, Integer, String>() { @Override public Tuple2<Integer, String> call(String line) throws Exception { String[] s = line.split(","); String key = s[0]+'#'+s[1]+'#' +s[2];// id,c1,c2 Integer value = Integer.valueOf(s[3]) //c3
    return new Tuple2<Integer,String>(key, value); } });

  • 減少按鑰匙地圖

  • JavaPairRDD<String,Integer> result = pairMap.reduceByKey( new Function2<Integer, Integer, Integer>() {
    @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } });

  • result對象包含您的預期的結果,其中關鍵是id+'#'+c1+'#'+c2和值被聚集c3。你可以進一步使用這張地圖。您可以在#上標記密鑰以獲取您的列,並可以使用apache-spark-sql將其插入到表中。
  • 我希望這會有所幫助。

    0

    首先,我建議使用'com.databricks.spark.csv'來讀取csv文件(當您運行pyspark shell http://spark-packages.org/package/databricks/spark-csv時,您需要使用'--package'來加載它)。然後使用GROUPBY方法:

    df = (sqlContext.read 
        .format('com.databricks.spark.csv') 
        .option("inferSchema", "true") 
        .option("header", "true") 
        .load("<your_file>.csv")) 
    
    df2= df.groupBy('id', 'C1', 'C2').agg({'C3': 'sum'}).sort('id', 'C1') 
    
    df.show() 
    +---+---+---+---+ 
    | id| C1| C2| C3| 
    +---+---+---+---+ 
    | A1| c1| c2| 2| 
    | A1| c1| c2| 1| 
    | A1|c11| c2| 1| 
    | A2| c1| c2| 1| 
    | A2| c1| c2| 1| 
    | A2|c11| c2| 1| 
    | A2|c11|c21| 1| 
    | A3| c1| c2| 1| 
    | A3| c1| c2| 2| 
    | A4| c1| c2| 1| 
    +---+---+---+---+ 
    
    df2.show() 
    
    +---+---+---+-------+ 
    | id| C1| C2|sum(C3)| 
    +---+---+---+-------+ 
    | A1| c1| c2|  3| 
    | A1|c11| c2|  1| 
    | A2| c1| c2|  2| 
    | A2|c11| c2|  1| 
    | A2|c11|c21|  1| 
    | A3| c1| c2|  3| 
    | A4| c1| c2|  1| 
    +---+---+---+-------+ 
    

    如果標籤「行」是非常重要的,你可以在以後添加它,並重新命名「SUM(C3)」到「C3」。有關更多信息,請參閱Spark Python API https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame