2016-11-09 27 views

我有以下格式的數據,Flink Streaming:如何實現由開始和結束元素定義的窗口?

SIP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二年11月8 16時58分58秒IST 2016 |邀請RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二年11月8 16時58分58秒IST 2016 | 0 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二 年11月8 16時58分58秒北京時間2016 | 1 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二年11月8 16時58分58秒IST 2016 | 2 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二年11月8 16時58分58秒IST 2016 | 3 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二 年11月8 16時58分58秒IST 2016 | 4 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 5 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 6 RTP | 2405463430 | 4115474257 | 8.205142580136622 E12 | Tue Nov 08 16:58:58 IST 2016 | 7 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 8 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16時58分58秒IST 2016 | 9 SIP | 2405463430 | 4115474257 | 8.205142580136622E12 |星期二 年11月8 16時58分58秒北京時間2016 | BYE


我該怎麼做? SIP-INVITE消息在給定用戶的任何時間點發出,而且我也可能同時有多個用戶發送多個SIP-INVITE消息。





val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker) 
val agg = input 
    // one global window per user (handles overlapping SIP-INVITE events). 
    // collect all data for each user until the trigger fires and purges the window. 
    // you have to implement a custom trigger which reacts on the marker. 
    .trigger(new YourCustomTrigger()) 
    // the window function computes your aggregation. 
    .apply(new YourWindowFunction()) 

我覺得一個觸發器,它執行以下操作應該工作(假定SIP-INVITE事件始終啓動一個會話)。 Trigger.onElement()方法應檢查SIP-BYE字段並觸發窗口評估並清除窗口,即返回TriggerResult.FIRE_AND_PURGE。這將調用評估函數並刪除窗口狀態。


詳情見的global windowstriggers的文件中,[Trigger][3] JavaDoc中,這blog post


謝謝:)這非常有幫助! –