我正在一個火花流應用程序,我需要打印JSON屬性的最小值,最大值,該屬性應在每20秒窗口打印最小值,最大值2秒的滑動窗口。 基本上(對於POC)我想打印作業組sparkContext的Spark UI上的min,max。Spark Streaming窗口輸出
SetJobGroup ("count-min-max", "count-min-max value of quality attribute").
這應該在Spark UI顯示上每隔20秒顯示一次。
下面是我的代碼,我可以得到最小值,最大值,計數,但打印每隔2秒執行一次,這是流化間隔時間不在20秒的窗口上。
val ssc = new StreamingContext(sparkContext, Seconds(2))
val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)
//val jsonCounts=lines.map { jsonRecord => parseJson(jsonRecord) }.map { x => x.mkString("\n") }.print
val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
.window(Seconds(20),Seconds(2))
valueDtsream.foreachRDD
{
rdd =>
if (!rdd.partitions.isEmpty)
{
val stats = rdd.flatMap(x => x)
println(stats.count().toString()+"-"+stats.min().toString()+"-"+stats.max().toString)
}
}
ssc.start()
ssc.awaitTermination()
什麼問題? –
我想在每20秒的一個窗口打印最小,最大,計數,而我的流式批處理時間爲2秒。 – nilesh1212