2017-06-18 78 views
1

我想分享這個流:1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5,。 ..這些會話:如何與Apache Flink會話流?

1,1,1 
2,2,2,2,2 
3,3,3,3,3,3,3 
0 
3,3,3 
5 

我已經寫CustomTrigger檢測何時流元素從1到2改變(2到3,3到0等),然後觸發觸發器。但這不是解決方案,因爲當我處理2的第一個元素並觸發觸發器時,窗口將是[1,1,1,2],但我需要觸發1的最後一個元素上的觸發器。

這裏是我的onElement功能04-0030-03在我的自定義觸發類:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { 
    if (prevState == element.value) { 
     prevState = element.value 
     TriggerResult.CONTINUE 
    } else { 
     prevState = element.value 
     TriggerResult.FIRE 
    } 
} 

我怎樣才能解決這個問題?

回答

2

我認爲FlatMapFunctionListState是實現此用例的最簡單方法。

當新元素到達時(即調用flatMap()方法),您檢查值是否更改。如果該值沒有更改,則將該元素附加到該狀態。如果該值發生更改,則會將當前列表狀態作爲會話發送,請清除該列表,然後將新元素作爲第一個列表狀態插入。

但是,您應該記住,這假定元素的順序被保留。 Flink確保在一個分區內,即只要元素不被混洗,並且所有操作員以相同的並行性運行。