2016-11-21 71 views
0

我正在一個火花流應用程序,我需要打印JSON屬性的最小值,最大值,該屬性應在每20秒窗口打印最小值,最大值2秒的滑動窗口。 基本上(對於POC)我想打印作業組sparkContext的Spark UI上的min,max。Spark Streaming窗口輸出

SetJobGroup ("count-min-max", "count-min-max value of quality attribute"). 

這應該在Spark UI顯示上每隔20秒顯示一次。

下面是我的代碼,我可以得到最小值,最大值,計數,但打印每隔2秒執行一次,這是流化間隔時間不在20秒的窗口上。

val ssc = new StreamingContext(sparkContext, Seconds(2)) 

val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2) 
val lines=record.map(_._2) 

     //val jsonCounts=lines.map { jsonRecord => parseJson(jsonRecord) }.map { x => x.mkString("\n") }.print 

     val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) } 
          .window(Seconds(20),Seconds(2)) 

     valueDtsream.foreachRDD 
     { 
     rdd => 
      if (!rdd.partitions.isEmpty) 
      { 
      val stats = rdd.flatMap(x => x) 
      println(stats.count().toString()+"-"+stats.min().toString()+"-"+stats.max().toString) 
      } 
     } 

     ssc.start() 
     ssc.awaitTermination() 
+0

什麼問題? –

+0

我想在每20秒的一個窗口打印最小,最大,計數,而我的流式批處理時間爲2秒。 – nilesh1212

回答

0

我想你是slideIntervalwindowLength之間的混淆。在window(windowLength, slideInterval)中:

  1. windowLength是窗口的長度,這意味着窗口應考慮多少個數據區間來計算。
  2. slideInterval窗口計算完成後,窗口移動多少個間隔。

如果我正確理解你的問題,你應該將其編輯爲:.window(Seconds(x),Seconds(20))

+0

我是新來的,可能請你幫我解釋一下window(windowLength,slideInterval)函數;它是如何工作的 – nilesh1212

+0

在你的問題中,你已經指定它打印「最小,最大,在每20秒窗口計數」,「而我的流式批處理時間是2秒」。但你的窗戶長度是多少? – vdep

+0

編輯答案 – vdep