2016-02-22 228 views
3

我有輸入線,如下面火花流update_state_by_keys

T1,文件1,1,1,1個

T1,文件1,1,2,3

T1,file2的,2 ,2,2,2

t2中,文件1,5,5,5

t2時,文件2,1,1,2,2

和輸出如下面的行,這是相應數字的垂直加法。

file1的:[1+,1 + 2 + 5,1 + 3 + 5]

file2的:[2 + 1,2 + 1,2 + 2,2 + 2]

目前數據彙總邏輯正在爲批處理間隔工作,但它不處於維護狀態。所以,我添加update_state_by_key函數並傳遞下面的函數,這是正確的方法嗎?

我目前的計劃:

def updateValues(newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = { 

     val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0)) 
     val allValues = newValues +: previousCount 
     Some(allValues.toList.transpose.map(_.sum).toArray) 

     } 

def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("HBaseStream") 
    val sc = new SparkContext(conf) 
    // create a StreamingContext, the main entry point for all streaming functionality 
    val ssc = new StreamingContext(sc, Seconds(2)) 
    // parse the lines of data into coverage objects 
    val inputStream = ssc.socketTextStream(<hostname>, 9999) 
    ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir") 
    inputStream.print(10) 
    val parsedDstream = inputStream 
     .map(line => { 
     val splitLines = line.split(",") 
     (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt)) 
     }) 
    val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues) 

     // Start the computation 
    ssc.start() 
    // Wait for the computation to terminate 
    ssc.awaitTermination() 

    } 

作爲參考,我以前的程序(沒有狀態轉變):

def main(args: Array[String]): Unit = { 
     val conf = new SparkConf().setAppName("HBaseStream") 
     val sc = new SparkContext(conf) 
     // create a StreamingContext, the main entry point for all streaming functionality 
     val ssc = new StreamingContext(sc, Seconds(2)) 
     val inputStream = ssc.socketTextStream("hostname", 9999) 
     val parsedDstream = inputStream 
      .map(line => { 
      val splitLines = line.split(",") 
      (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt)) 
      }) 
      .reduceByKey((first, second) => { 
      val listOfArrays = ArrayBuffer(first, second) 
      listOfArrays.toList.transpose.map(_.sum).toArray 
      }) 
      .foreachRDD(rdd => rdd.foreach(Blaher.blah)) 
    } 

在此先感謝。

回答

1

您要找的是updateStateByKey。對於DStream[(T, U)]應該採取函數兩個參數:

  • Seq[U] - 代表狀態當前窗口
  • Option[U] - 代表的堆積狀態

,並返回Option[U]

鑑於你的代碼可能例如可以實現這樣的:

import breeze.linalg.{DenseVector => BDV} 
import scala.util.Try 

val state: DStream[(String, Array[Int])] = parsedStream.updateStateByKey(
    (current: Seq[Array[Int]], prev: Option[Array[Int]]) => { 
    prev.map(_ +: current).orElse(Some(current)) 
    .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) 
}) 

爲了能夠使用它,你就必須configure checkpointing

+0

謝謝Zero323,我只是編輯了我的問題,看到了你的答案。我正在檢查你的方法,會更新。再次感謝。 – Vibhuti

+0

非常感謝你Zero323,它的工作! – Vibhuti

+0

嗨zero323,它工作正常,但是當我終止程序並再次盯着我的程序時,它從一開始就在計算。它預期的行爲?我可以更改有狀態轉換中的任何設置,以便它能記住以前的狀態嗎? – Vibhuti