0

我有一個數據框,它具有幾個屬性(C1到C2),偏移量(以天爲單位)和幾個值(V1,V2)。Spark Dataframe/Dataset:通用條件累積和

val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset") 
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields] 

scala> inputDF.show 
+---+---+---+---+------+ 
| c1| c2| v1| v2|offset| 
+---+---+---+---+------+ 
| 1| 2| 30|100| -1| 
| 1| 2| 30|100|  0| 
| 1| 2| 30|100|  1| 
| 11| 21| 30|100| -1| 
| 11| 21| 30|100|  0| 
| 11| 21| 30|100|  1| 
+---+---+---+---+------+ 

我需要做的是,計算(c1,c2)在整個偏移量上的V1,V2的累積和。

我嘗試過,但這遠離可以在任何數據框上工作的通用解決方案。

import org.apache.spark.sql.expressions.Window 

val groupKey = List("c1", "c2").map(x => col(x.trim)) 
val orderByKey = List("offset").map(x => col(x.trim)) 

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*) 

val outputDF = inputDF 
    .withColumn("cumulative_v1", sum(inputDF("v1")).over(w)) 
    .withColumn("cumulative_v2", sum(inputDF("v2")).over(w)) 

+---+---+---+---+------+---------------------------- 
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2| 
+---+---+---+---+------+-------------|--------------| 
| 1| 2| 30|100| -1|30   | 100   | 
| 1| 2| 30|100|  0|60   | 200   | 
| 1| 2| 30|100|  1|90   | 300   | 
| 11| 21| 30|100| -1|30   | 100   | 
| 11| 21| 30|100|  0|60   | 200   | 
| 11| 21| 30|100|  1|90   | 300   | 
+---+---+---+---+------+----------------------------- 

的挑戰是[α]我需要做這種跨多個和不同偏移的窗口(-1至1),(-10〜10),(-30〜30)或任何其它並[b]我需要在多個數據框/數據集中使用此功能,所以我希望能夠使用RDD/Dataset中的通用函數。

有關我如何在Spark 2.0中實現這一點的任何想法?

非常感謝幫助。謝謝!

+0

歡迎堆棧溢出!我們是一個問答網站,而不是一個打碼人員的服務。請解釋你到目前爲止嘗試過的以及爲什麼它沒有奏效。請參閱:[爲什麼「有人可以幫助我?」不是一個實際的問題?](http://meta.stackoverflow.com/q/284236) –

+0

謝謝。我用我的解決方案獲得了上述結果集。現在添加它。 – Yash

回答

0

下面是使用數據框的原始圖片。

import org.apache.spark.sql.expressions.Window 

val groupKey = List("c1", "c2").map(x => col(x.trim)) 
val orderByKey = List("offset").map(x => col(x.trim)) 

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*) 

val inputDF= spark 
    .sparkContext 
    .parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10) 
    .toDF("c1", "c2", "v1", "v2", "offset") 

val outputDF = inputDF 
    .withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w)) 
    .withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w)) 
    .withColumn("cumulative_v2", sum(inputDF("v2")).over(w)) 

這產生了對於不同窗口的單個「值」的累計總和。

scala> outputDF.show 
+---+---+---+---+------+-------------+-------------+-------------+    
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2| 
+---+---+---+---+------+-------------+-------------+-------------+ 
| 1| 2| 3|100| -2|   0|   0|   100| 
| 1| 2| 30|100| -1|   30|   30|   200| 
| 1| 2| 30|100|  0|   60|   60|   300| 
| 1| 2| 30|100|  1|   90|   90|   400| 
| 1| 2|140|100|  2|   90|   90|   500| 
| 11| 21| 30|100| -1|   30|   30|   100| 
| 11| 21| 30|100|  0|   60|   60|   200| 
| 11| 21| 30|100|  1|   90|   90|   300| 
+---+---+---+---+------+-------------+-------------+-------------+ 

幾個這種方法的缺點 - [1]各條件窗口(-1,1),(-2,2)或任何(from_offset,to_offset),和()需要分開呼叫。 [2]這不是一個通用功能。

我知道火花接受列的聚集函數變量列表這樣的 -

val exprs = Map("v1" -> "sum", "v2" -> "sum") 

但我不能確定如何擴展該具有可變條件窗口功能。我仍然很想知道是否有更好的模塊化/可重用功能,我們可以寫出來解決這個問題。