2016-11-09 27 views
7

我有以下格式的數據,Flink Streaming:如何實現由開始和結束元素定義的窗口?

SIP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二年11月8 16時58分58秒IST 2016 |邀請RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二年11月8 16時58分58秒IST 2016 | 0 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二 年11月8 16時58分58秒北京時間2016 | 1 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二年11月8 16時58分58秒IST 2016 | 2 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二年11月8 16時58分58秒IST 2016 | 3 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二 年11月8 16時58分58秒IST 2016 | 4 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 5 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 6 RTP | 2405463430 | 4115474257 | 8.205142580136622 E12 | Tue Nov 08 16:58:58 IST 2016 | 7 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 8 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16時58分58秒IST 2016 | 9 SIP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二 年11月8 16時58分58秒北京時間2016 | BYE

我想遇到SIP-INVITE消息時,開始我的窗口並觸發一個事件時遇到消息,執行一些聚合。

我該怎麼做? SIP-INVITE消息在給定用戶的任何時間點發出,而且我也可能同時有多個用戶發送多個SIP-INVITE消息。

回答

2

我認爲你可以用用戶鍵入的全局窗口來解決你的用例。全局窗口收集每個密鑰的所有數據,並將觸發和清除窗口的責任推到用戶定義的Trigger函數。

全局窗口定義如下:

val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker) 
val agg = input 
    // one global window per user (handles overlapping SIP-INVITE events). 
    .keyBy(_._1) 
    // collect all data for each user until the trigger fires and purges the window. 
    .window(GlobalWindows.create()) 
    // you have to implement a custom trigger which reacts on the marker. 
    .trigger(new YourCustomTrigger()) 
    // the window function computes your aggregation. 
    .apply(new YourWindowFunction()) 

我覺得一個觸發器,它執行以下操作應該工作(假定SIP-INVITE事件始終啓動一個會話)。 Trigger.onElement()方法應檢查SIP-BYE字段並觸發窗口評估並清除窗口,即返回TriggerResult.FIRE_AND_PURGE。這將調用評估函數並刪除窗口狀態。

請注意,如果您要支持亂序事件,則需要特別注意(在這種情況下,您應該設置一個事件時間計時器到關閉元素的時間戳以確保接收到時間戳之前的所有數據)。如果因爲數據不在「介於」SIP-INVITESIP-BYE之間而應該丟棄數據,則還需要處理該數據。

詳情見的global windowstriggers的文件中,[Trigger][3] JavaDoc中,這blog post

+0

謝謝:)這非常有幫助! –