我擁有RDD中的數據,其中有4列,如geog,產品,時間和價格。我想根據geog和時間來計算運行總和。如何使用Spark scala獲得基於兩列的運行總和RDD
給定的數據
我需要的結果等。
[
我需要這個火花斯卡拉-RDD。我是這個Scala世界的新手,我可以在SQL中輕鬆實現這一點。我想要在火花-Scala -RDD中使用(地圖,平面地圖)。
高級謝謝你的幫助。
我擁有RDD中的數據,其中有4列,如geog,產品,時間和價格。我想根據geog和時間來計算運行總和。如何使用Spark scala獲得基於兩列的運行總和RDD
給定的數據
我需要的結果等。
[
我需要這個火花斯卡拉-RDD。我是這個Scala世界的新手,我可以在SQL中輕鬆實現這一點。我想要在火花-Scala -RDD中使用(地圖,平面地圖)。
高級謝謝你的幫助。
這是可以通過定義一個窗口函數:
>>> val data = List(
("India","A1","Q1",40),
("India","A2","Q1",30),
("India","A3","Q1",21),
("German","A1","Q1",50),
("German","A3","Q1",60),
("US","A1","Q1",60),
("US","A2","Q2",25),
("US","A4","Q1",20),
("US","A5","Q5",15),
("US","A3","Q3",10)
)
>>> val df = sc.parallelize(data).toDF("country", "part", "quarter", "result")
>>> df.show()
+-------+----+-------+------+
|country|part|quarter|result|
+-------+----+-------+------+
| India| A1| Q1| 40|
| India| A2| Q1| 30|
| India| A3| Q1| 21|
| German| A1| Q1| 50|
| German| A3| Q1| 60|
| US| A1| Q1| 60|
| US| A2| Q2| 25|
| US| A4| Q1| 20|
| US| A5| Q5| 15|
| US| A3| Q3| 10|
+-------+----+-------+------+
>>> val window = Window.partitionBy("country").orderBy("part", "quarter")
>>> val resultDF = df.withColumn("agg", sum(df("result")).over(window))
>>> resultDF.show()
+-------+----+-------+------+---+
|country|part|quarter|result|agg|
+-------+----+-------+------+---+
| India| A1| Q1| 40| 40|
| India| A2| Q1| 30| 70|
| India| A3| Q1| 21| 91|
| US| A1| Q1| 60| 60|
| US| A2| Q2| 25| 85|
| US| A3| Q3| 10| 95|
| US| A4| Q1| 20|115|
| US| A5| Q5| 15|130|
| German| A1| Q1| 50| 50|
| German| A3| Q1| 60|110|
+-------+----+-------+------+---+
爲此,您可以使用窗口功能,請大家看看Databrick博客有關Windows: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
希望這有助於。
快樂引發!歡呼,福柯
感謝您的幫助。我也嘗試過RDD。 – balaji
我認爲這也會幫助其他人。我在SCALA RDD嘗試過。
val fileName_test_1 ="C:\\venkat_workshop\\Qintel\\Data_Files\\test_1.txt"
val rdd1 = sc.textFile(fileName_test_1).map { x => (x.split(",")(0).toString() ,
x.split(",")(1).toString(),
x.split(",")(2).toString(),
x.split(",")(3).toDouble
)
}.groupBy(x => (x._1,x._3))
.mapValues
{
_.toList.sortWith
{
(a,b) => (a._4) > (b._4)
}.scanLeft("","","",0.0,0.0){
(a,b) => (b._1,b._2,b._3,b._4,b._4+a._5)
}.tail
}.flatMapValues(f => f).values
任何代碼會一直激勵我們。 –
您必須使用累加器來跟蹤總和。 – code
@balaji您可以將RDD轉換爲DataFrame,並重復使用現有的SQL :) –