2017-09-11 58 views
2

我試圖用星火結構分流到從卡夫卡數項的數量針對每個時間窗與下面的代碼:如何計算每個時間窗口的項目數?

import java.text.SimpleDateFormat 
import java.util.Date 
import org.apache.spark.sql.ForeachWriter 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions.window 

object Counter extends App { 
    val dateFormatter = new SimpleDateFormat("HH:mm:ss") 
    val spark = ... 
    import spark.implicits._ 

    val df = spark.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", ...) 
    .option("subscribe", ...) 
    .load() 

    val windowDuration = "5 minutes" 
    val counts = df 
    .select("value").as[Array[Byte]] 
    .map(decodeTimestampFromKafka).toDF("timestamp") 
    .select($"timestamp" cast "timestamp") 
    .withWatermark("timestamp", windowDuration) 
    .groupBy(window($"timestamp", windowDuration, "1 minute")) 
    .count() 
    .as[((Long, Long), Long)] 

    val writer = new ForeachWriter[((Long, Long), Long)] { 
    var partitionId: Long = _ 
    var version: Long = _ 

    def open(partitionId: Long, version: Long): Boolean = { 
     this.partitionId = partitionId 
     this.version = version 
     true 
    } 

    def process(record: ((Long, Long), Long)): Unit = { 
     val ((start, end), docs) = record 
     val startDate = dateFormatter.format(new Date(start)) 
     val endDate = dateFormatter.format(new Date(end)) 
     val now = dateFormatter.format(new Date) 
     println(s"$now:$this|$partitionId|$version: ($startDate, $endDate) $docs") 
    } 

    def close(errorOrNull: Throwable): Unit = {} 
    } 

    val query = counts 
    .repartition(1) 
    .writeStream 
    .outputMode("complete") 
    .foreach(writer) 
    .start() 

    query.awaitTermination() 

    def decodeTimestampFromKafka(bytes: Array[Byte]): Long = ... 
} 

我預計,一旦每分鐘(中幻燈片持續時間),它會輸出一個記錄(因爲只有聚集關鍵是窗)與項目計數的最後5分鐘(窗口期)。 然而,它輸出一些記錄每分鐘2-3次,這樣的樣品中:

... 
22:44:34|[email protected]|0|8: (22:43:20, 22:43:20) 383 
22:44:34|[email protected]|0|8: (22:43:18, 22:43:19) 435 
22:44:34|[email protected]|0|8: (22:42:33, 22:42:34) 395 
22:44:34|[email protected]|0|8: (22:43:14, 22:43:14) 435 
22:44:34|[email protected]|0|8: (22:43:09, 22:43:09) 437 
22:44:34|[email protected]|0|8: (22:43:19, 22:43:19) 411 
22:44:34|[email protected]|0|8: (22:43:07, 22:43:07) 400 
22:44:34|[email protected]|0|8: (22:43:17, 22:43:17) 392 
22:44:44|[email protected]|0|9: (22:43:37, 22:43:38) 420 
22:44:44|[email protected]|0|9: (22:43:25, 22:43:25) 395 
22:44:44|[email protected]|0|9: (22:43:22, 22:43:22) 416 
22:44:44|[email protected]|0|9: (22:43:00, 22:43:00) 438 
22:44:44|[email protected]|0|9: (22:43:41, 22:43:41) 426 
22:44:44|[email protected]|0|9: (22:44:13, 22:44:13) 132 
22:44:44|[email protected]|0|9: (22:44:02, 22:44:02) 128 
22:44:44|[email protected]|0|9: (22:44:09, 22:44:09) 120 
... 

改變輸出模式,以「追加」似乎改變行爲,但仍遠遠沒有到我的預期。

有什麼不對我就應該工作的方式的假設?鑑於上面的代碼,應該如何解釋或使用樣本輸出?

回答

相關問題