2017-07-24 37 views
0

我有一個DataFrame,有兩個column,「index」和「values」,我想根據列「values」得到「delayValues」,這是我的代碼:如何在窗口中使用partitionBy函數scala/spark

val arr = Array(1,4,3,2,5,7,3,5,4,18) 
    val input=new ArrayBuffer[(Int,Int)]() 
    for(i<-0 until 10){ 
     input.append((i,arr(i))) 
    } 
    val window=Window.rowsBetween(-2,0) 
    val df = sc.parallelize(input, 4).toDF("index","values") 
    df.withColumn("valueDealy",first(col("values")).over(window)).show() 

這是結果:

enter image description here

這是我除了結果,但我發現所有的數據被收集到一個分區,然後我用partitionBy功能,這是我的改變代碼:

val arr = Array(1,4,3,2,5,7,3,5,4,18) 
    val input=new ArrayBuffer[(Int,Int)]() 
    for(i<-0 until 10){ 
     input.append((i,arr(i))) 
    } 
    val window=Window.orderBy(col("index")).partitionBy(col("index")).rowsBetween(-2,0) 
    val df = sc.parallelize(input, 4).toDF("index","values") 
    df.withColumn("valueDealy",first(col("values")).over(window)).show() 

結果是:

+-----+------+----------+ 
|index|values|valueDealy| 
+-----+------+----------+ 
| 0|  1|   1| 
| 3|  2|   2| 
| 7|  5|   5| 
| 9| 18|  18| 
| 4|  5|   5| 
| 6|  3|   3| 
| 5|  7|   7| 
| 2|  3|   3| 
| 1|  4|   4| 
| 8|  4|   4| 
+-----+------+----------+ 

我得到一個錯誤的結果,當我使用partitionBy,我應該怎麼辦謝謝!

我除了輸出

 +-----+------+----------+ 
     |index|values|valueDealy| 
     +-----+------+----------+ 
     | 0|  1|   1| 
     | 1|  4|   1| 
     | 2|  3|   1| 
     | 3|  2|   4| 
     | 4|  5|   3| 
     | 5|  7|   2| 
     | 6|  3|   5| 
     | 7|  5|   7| 
     | 8|  4|   3| 
     | 9| 18|   5| 
     +-----+------+----------+ 

和數據在多個分區!

+0

你預期的產量是多少? –

+0

我改變了我的問題@RameshMaharjan – mentongwu

回答

0

通常,沒有有效的解決方案可以直接用Spark SQL來表達。就個人而言,使用Scala的時候,我會用mllib功能:

import org.apache.spark.mllib.rdd.RDDFunctions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row 

val n = 2 
spark.createDataFrame(
    df.rdd.sliding(n + 1).map { xs => Row(xs(0), xs(n)) }, 
    StructType(Seq(
    StructField("delay", df.schema), StructField("current", df.schema)))) 

,但如果您的數據集包含連續的ID,您也可以join

df.alias("current").join(
    df.withColumn("index", $"index" - n).alias("previous"), Seq("index")) 

請沒有這兩個解決方案可能需要一些修正在該系列的開始/結束。使用join可以使用outer加入,使用sliding您可以使用union加上數據集開始/結束處所需的記錄數。

相關問題