2017-10-05 100 views
0

假設我們有一個包含兩列(稱爲索引和值)的Spark數據集,按第一列(索引)排序。在Spark數據集中創建具有運行總計的列

((1, 100), (2, 110), (3, 90), ...) 

我們希望有一個數據集與具有運行總計值的第二列(值)的第三列。

((1, 100, 100), (2, 110, 210), (3, 90, 300), ...) 

任何建議,如何有效地做到這一點,有一次通過的數據?還是有沒有可用於此的任何罐裝CDF型功能?

如果需要,可以將數據集轉換爲Dataframe或RDD來完成任務,但它必須保持分佈式數據結構。也就是說,它不能被簡單地收集並轉換爲數組或序列,並且不會使用可變變量(僅適用於val,不適用var)。

回答

0

甲同事建議其依賴於RDD.mapPartitionsWithIndex()方法如下。 (據我所知,其他的數據結構不提供這種參考其分區的指標。)

val data = sc.parallelize((1 to 5)) // sc is the SparkContext 
val partialSums = data.mapPartitionsWithIndex{ (i, values) => 
    Iterator((i, values.sum)) 
}.collect().toMap // will in general have size other than data.count 
val cumSums = data.mapPartitionsWithIndex{ (i, values) => 
    val prevSums = (0 until i).map(partialSums).sum 
    values.scanLeft(prevSums)(_+_).drop(1) 
} 
1

但它必須保持分佈式數據結構。

不幸的是,你說過你試圖做的事情在Spark中是不可能的。如果您願意將數據集重新分區到單個分區(實際上將其整合到單個主機上),則可以輕鬆編寫一個函數來執行所需操作,並將增加後的值保留爲字段。

由於Spark函數在執行時不會在網絡中共享狀態,因此無法創建共享狀態,您需要將數據集保持完全分佈。

如果你願意放鬆要求,允許合併的數據,並在一臺主機上單通看一遍,然後你可以做你希望重新分區到單個分區和應用的功能是什麼。這不會將數據拖放到驅動程序上(將其保存在HDFS /集羣中),但仍然可以通過單個執行程序依次計算輸出。例如:

package com.github.nevernaptitsa 

import java.io.Serializable 
import java.util 

import org.apache.spark.sql.{Encoders, SparkSession} 

object SparkTest { 

    class RunningSum extends Function[Int, Tuple2[Int, Int]] with Serializable { 
    private var runningSum = 0 
    override def apply(v1: Int): Tuple2[Int, Int] = { 
     runningSum+=v1 
     return (v1, runningSum) 
    } 
    } 

    def main(args: Array[String]): Unit ={ 
    val session = SparkSession.builder() 
     .appName("runningSumTest") 
     .master("local[*]") 
     .getOrCreate() 
    import session.implicits._ 
    session.createDataset(Seq(1,2,3,4,5)) 
     .repartition(1) 
     .map(new RunningSum) 
     .show(5) 
    session.createDataset(Seq(1,2,3,4,5)) 
     .map(new RunningSum) 
     .show(5) 
    } 

} 

這裏的兩個語句顯示不同的輸出,第一提供正確的輸出(串行,因爲repartition(1)被稱爲),和因爲結果是並行計算的第二提供不正確的輸出。從第一個語句

結果:

+---+---+ 
| _1| _2| 
+---+---+ 
| 1| 1| 
| 2| 3| 
| 3| 6| 
| 4| 10| 
| 5| 15| 
+---+---+ 

結果從第二語句:

+---+---+ 
| _1| _2| 
+---+---+ 
| 1| 1| 
| 2| 2| 
| 3| 3| 
| 4| 4| 
| 5| 9| 
+---+---+ 
+0

感謝您詳細的答覆,@Ed。 我很懷疑。 也許我應該看看這個用例的近似方法類型[Greenwald-Khanna](http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf)。 –

+0

沒問題@BahmanEngheta!如果你對我的答案滿意,你會介意將其標記爲已接受嗎? –

相關問題