2015-12-08 146 views
9

我遇到了一些麻煩,理解事件時間窗口周圍的語義。以下程序會生成一些帶時間戳的元組,用作事件時間並進行簡單的窗口聚合。我希望輸出的順序與輸入相同,但輸出順序不同。爲什麼輸出與事件時間無關?Flink流事件時間窗口排序

import java.util.concurrent.TimeUnit 
import org.apache.flink.streaming.api.TimeCharacteristic 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.scala._ 

object WindowExample extends App { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
    env.getConfig.enableTimestamps() 
    env.setParallelism(1) 

    val start = 1449597577379L 
    val tuples = (1 to 10).map(t => (start + t * 1000, t)) 

    env.fromCollection(tuples) 
     .assignAscendingTimestamps(_._1) 
     .timeWindowAll(Time.of(1, TimeUnit.SECONDS)) 
     .sum(1) 
     .print() 

    env.execute() 
} 

輸入:

(1449597578379,1) 
(1449597579379,2) 
(1449597580379,3) 
(1449597581379,4) 
(1449597582379,5) 
(1449597583379,6) 
(1449597584379,7) 
(1449597585379,8) 
(1449597586379,9) 
(1449597587379,10) 

結果:

[info] (1449597579379,2) 
[info] (1449597581379,4) 
[info] (1449597583379,6) 
[info] (1449597585379,8) 
[info] (1449597587379,10) 
[info] (1449597578379,1) 
[info] (1449597580379,3) 
[info] (1449597582379,5) 
[info] (1449597584379,7) 
[info] (1449597586379,9) 

回答

10

這樣做的原因的行爲是在弗林克元件(相對於時間戳)的順序不考慮帳戶。只有水印的正確性及其與元素時間戳的關係對於考慮時間的操作很重要,因爲水印通常會觸發基於時間的操作中的計算。

在您的示例中,窗口操作符將來自源的所有元素存儲在內部窗口緩衝區中。然後,源發出一個水印,表示將來不會有具有較小時間戳的元素到達。反過來,這又會告訴窗口操作員處理所有低於水印的結束時間戳的窗口(對於所有窗口都是如此)。因此,它發出所有窗口(以任意排序),然後它自己發出一個水印。下游的操作本身會接收到這些元素,並且可以在接收到水印後進行處理。

默認情況下,從源發出水印的時間間隔爲200毫秒。由於源頭髮出的少量元素會在發出第一個水印之前發出。在現實世界的用例中,水印發射間隔比窗口大小要小很多,您可以按照時間戳的順序獲得預期的窗口行爲。例如,如果每500毫秒有1小時窗口和水印。

+1

您可以給出還是指向下游操作的示例,它可以在收到水印後根據事件時間對元素重新排序?謝謝! –

+1

@MaximKolchin這樣的重排序發生在例如在CEP庫中。你可以看看這裏:https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java –