2017-01-11 34 views
0

我擁有RDD中的數據,其中有4列,如geog,產品,時間和價格。我想根據geog和時間來計算運行總和。如何使用Spark scala獲得基於兩列的運行總和RDD

給定的數據

Source

我需要的結果等。

[Need Output like this]

我需要這個火花斯卡拉-RDD。我是這個Scala世界的新手,我可以在SQL中輕鬆實現這一點。我想要在火花-Scala -RDD中使用(地圖,平面地圖)。

高級謝謝你的幫助。

+0

任何代碼會一直激勵我們。 –

+0

您必須使用累加器來跟蹤總和。 – code

+0

@balaji您可以將RDD轉換爲DataFrame,並重復使用現有的SQL :) –

回答

2

這是可以通過定義一個窗口函數:

>>> val data = List(
    ("India","A1","Q1",40), 
    ("India","A2","Q1",30), 
    ("India","A3","Q1",21), 
    ("German","A1","Q1",50), 
    ("German","A3","Q1",60), 
    ("US","A1","Q1",60), 
    ("US","A2","Q2",25), 
    ("US","A4","Q1",20), 
    ("US","A5","Q5",15), 
    ("US","A3","Q3",10) 
) 

>>> val df = sc.parallelize(data).toDF("country", "part", "quarter", "result") 
>>> df.show() 

+-------+----+-------+------+ 
|country|part|quarter|result| 
+-------+----+-------+------+ 
| India| A1|  Q1| 40| 
| India| A2|  Q1| 30| 
| India| A3|  Q1| 21| 
| German| A1|  Q1| 50| 
| German| A3|  Q1| 60| 
|  US| A1|  Q1| 60| 
|  US| A2|  Q2| 25| 
|  US| A4|  Q1| 20| 
|  US| A5|  Q5| 15| 
|  US| A3|  Q3| 10| 
+-------+----+-------+------+ 

>>> val window = Window.partitionBy("country").orderBy("part", "quarter") 
>>> val resultDF = df.withColumn("agg", sum(df("result")).over(window)) 
>>> resultDF.show() 

+-------+----+-------+------+---+ 
|country|part|quarter|result|agg| 
+-------+----+-------+------+---+ 
| India| A1|  Q1| 40| 40| 
| India| A2|  Q1| 30| 70| 
| India| A3|  Q1| 21| 91| 
|  US| A1|  Q1| 60| 60| 
|  US| A2|  Q2| 25| 85| 
|  US| A3|  Q3| 10| 95| 
|  US| A4|  Q1| 20|115| 
|  US| A5|  Q5| 15|130| 
| German| A1|  Q1| 50| 50| 
| German| A3|  Q1| 60|110| 
+-------+----+-------+------+---+ 

爲此,您可以使用窗口功能,請大家看看Databrick博客有關Windows: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

希望這有助於。

快樂引發!歡呼,福柯

+0

感謝您的幫助。我也嘗試過RDD。 – balaji

1

我認爲這也會幫助其他人。我在SCALA RDD嘗試過。

val fileName_test_1 ="C:\\venkat_workshop\\Qintel\\Data_Files\\test_1.txt" 


    val rdd1 = sc.textFile(fileName_test_1).map { x => (x.split(",")(0).toString() , 
                  x.split(",")(1).toString(), 
                  x.split(",")(2).toString(), 
                  x.split(",")(3).toDouble 
                 ) 
                }.groupBy(x => (x._1,x._3)) 
                .mapValues 
                  { 
                   _.toList.sortWith 
                   { 
                   (a,b) => (a._4) > (b._4) 
                   }.scanLeft("","","",0.0,0.0){ 
                   (a,b) => (b._1,b._2,b._3,b._4,b._4+a._5) 
                   }.tail 
                  }.flatMapValues(f => f).values