3
我有輸入線,如下面火花流update_state_by_keys
T1,文件1,1,1,1個
T1,文件1,1,2,3
T1,file2的,2 ,2,2,2
t2中,文件1,5,5,5
t2時,文件2,1,1,2,2
和輸出如下面的行,這是相應數字的垂直加法。
file1的:[1+,1 + 2 + 5,1 + 3 + 5]
file2的:[2 + 1,2 + 1,2 + 2,2 + 2]
目前數據彙總邏輯正在爲批處理間隔工作,但它不處於維護狀態。所以,我添加update_state_by_key函數並傳遞下面的函數,這是正確的方法嗎?
我目前的計劃:
def updateValues(newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = {
val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0))
val allValues = newValues +: previousCount
Some(allValues.toList.transpose.map(_.sum).toArray)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
// parse the lines of data into coverage objects
val inputStream = ssc.socketTextStream(<hostname>, 9999)
ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir")
inputStream.print(10)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues)
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
作爲參考,我以前的程序(沒有狀態轉變):
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", 9999)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
.reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
})
.foreachRDD(rdd => rdd.foreach(Blaher.blah))
}
在此先感謝。
謝謝Zero323,我只是編輯了我的問題,看到了你的答案。我正在檢查你的方法,會更新。再次感謝。 – Vibhuti
非常感謝你Zero323,它的工作! – Vibhuti
嗨zero323,它工作正常,但是當我終止程序並再次盯着我的程序時,它從一開始就在計算。它預期的行爲?我可以更改有狀態轉換中的任何設置,以便它能記住以前的狀態嗎? – Vibhuti