我想創建一個Trigger
,它在20秒內第一次觸發並且在此後每隔5秒觸發一次。我已經使用GlobalWindows
和一個自定義Trigger
Flink自定義觸發器給予意想不到的輸出
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
這裏是TradeTrigger
代碼:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} else {
if((System.currentTimeMillis()-ctime) >= 5000){
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
public static <W extends Window> TradeTrigger<W> of() {
return new TradeTrigger<>();
}
}
因此,基本上,當flag
是false
,即首次Trigger
應該在20秒內被解僱並將flag
設置爲true
。從下一次開始,它應該每5秒發射一次。
我面臨的問題是,每次觸發Trigger
時,我在輸出中只收到一條消息。也就是說,我在20秒後收到一條消息,每五秒收到一條消息。 我期待在每次觸發的輸出中有20條消息。
如果我使用.timeWindow(Time.seconds(5))
並創建一個五秒鐘的時間窗口,我每5秒鐘會得到20條輸出消息。 請幫我拿這段代碼吧。有什麼我失蹤?
你能否用更新的代碼更新答案?這將非常有幫助 – Akash