2016-04-22 25 views
0

我想創建一個Trigger,它在20秒內第一次觸發並且在此後每隔5秒觸發一次。我已經使用GlobalWindows和一個自定義TriggerFlink自定義觸發器給予意想不到的輸出

val windowedStream = valueStream 
          .keyBy(0) 
          .window(GlobalWindows.create()) 
          .trigger(TradeTrigger.of()) 

這裏是TradeTrigger代碼:

@PublicEvolving 
public class TradeTrigger<W extends Window> extends Trigger<Object, W> { 

    private static final long serialVersionUID = 1L; 

    static boolean flag=false; 
    static long ctime = System.currentTimeMillis(); 

    private TradeTrigger() { 
    } 

    @Override 
    public TriggerResult onElement(
      Object arg0, 
      long arg1, 
      W arg2, 
      org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3) 
      throws Exception { 
     // TODO Auto-generated method stub 

     if(flag == false){ 
      if((System.currentTimeMillis()-ctime) >= 20000){ 
       flag = true; 
       ctime = System.currentTimeMillis(); 
       return TriggerResult.FIRE; 
      } 
      return TriggerResult.CONTINUE; 
     } else { 
      if((System.currentTimeMillis()-ctime) >= 5000){ 
       ctime = System.currentTimeMillis(); 
       return TriggerResult.FIRE; 
      } 
      return TriggerResult.CONTINUE; 
     } 

    } 

    @Override 
    public TriggerResult onEventTime(
      long arg0, 
      W arg1, 
      org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2) 
      throws Exception { 
     // TODO Auto-generated method stub 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(
      long arg0, 
      W arg1, 
      org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2) 
      throws Exception { 
     // TODO Auto-generated method stub 
     return TriggerResult.CONTINUE; 
    } 


    public static <W extends Window> TradeTrigger<W> of() { 
     return new TradeTrigger<>(); 
    } 

} 

因此,基本上,當flagfalse,即首次Trigger應該在20秒內被解僱並將flag設置爲true。從下一次開始,它應該每5秒發射一次。

我面臨的問題是,每次觸發Trigger時,我在輸出中只收到一條消息。也就是說,我在20秒後收到一條消息,每五秒收到一條消息。 我期待在每次觸發的輸出中有20條消息。

如果我使用.timeWindow(Time.seconds(5))並創建一個五秒鐘的時間窗口,我每5秒鐘會得到20條輸出消息。 請幫我拿這段代碼吧。有什麼我失蹤?

回答

1

得到它在Fabian和Flink郵件列表的答案幫助下工作。 通過TriggerContext將狀態存儲在ValueState變量中。檢查onEvent()方法中的變量,如果它是第一次,則註冊一個processingTimeTimer比當前時間多20秒並更新狀態。在onProcessingTime方法中,註冊另一個ProcessingTimeTimer比當前時間多5秒,更新狀態並開啓Window

+0

你能否用更新的代碼更新答案?這將非常有幫助 – Akash

3

有你的觸發執行的幾個問題:

  1. 你應該函數的狀態從來沒有存儲在一個靜態變量。 Flink不會隔離JVM中的用戶進程。相反,它使用每個TaskManager的單個JVM並啓動多個線程。因此,您的靜態布爾標誌在多個觸發器實例之間共享。您應該存儲Flink的ValueState接口,該接口可從TriggerContext訪問。如果發生故障,Flink會小心檢查您的狀態並恢復。

  2. Trigger.onEvent()僅在新事件到達時調用。所以它不能用於在特定時間觸發窗口計算。相反,您應該註冊事件時間計時器或處理時間計時器(再次通過TriggerContext)。計時器將分別呼叫Trigger.onEventTime()Trigger.onProcessingTime()。是否使用事件或處理時間取決於您的使用情況。

+0

再次謝謝Fabian。但我需要知道如何以編程方式完成此任務。 1.如何將我的標誌變量存儲在'ValueState'中,以及如何訪問它? 2.我是否需要在我的'Trigger'的'onProcessingTime()'或'onEventTime()'重寫方法中寫入邏輯? 3.什麼是通過觸發上下文註冊計時器的正確語法? –