2017-09-13 102 views
1

推拉窗我用這個代碼來執行我的測試(Flink Quick Start):壞逆足與弗林克

val text = env.socketTextStream("localhost", port, '\n') 

    // parse the data, group it, window it, and aggregate the counts 
    val windowCounts = text 
     .flatMap { w => w.split("\\s") } 
     .map { w => WordWithCount(w, 1) } 
     .keyBy("word") 
     .timeWindow(Time.minute(15)) 
     .sum("count") 

與此代碼我有超過65 000輸入/ Seconde系列

如果我改變

timeWindow(Time.minute(15)) 

通過

timeWindow(Time.minutes(15), Time.seconds(1)) 

我有少於2 500輸入/ seconde

有什麼辦法可以有更好的性能與滑動Windows?

回答

2

使用15分鐘的滾動窗口,每個傳入事件被分配到一個窗口,而帶有一秒滑動的15分鐘滑動窗口,每個傳入事件被複制到15 * 60 = 900窗口。這顯然會對性能產生影響。

根據您的應用需求,您可以通過使用ProcessFunction或實現自定義窗口邏輯,以較少的開銷計算所需的內容。例如,您可以預先聚合到900秒的窗口中,然後有第二層窗口,通過減去即將到期的第二個對總數的貢獻來遞增調整15分鐘結果,並添加最近一秒的值。