2017-10-18 85 views
1

當我嘗試使用窗口和摺疊功能聚合元素時,元素的某些 從獲取聚合中錯過。使用來自卡夫卡(value:0, value:1, value:2, value:3)的元素 ,並將它們合併爲奇數和偶數值 。使用Flink窗口和摺疊功能,元素缺失?

輸出爲:10-13之間

{even=[0, 2, 4], odd=[1, 3]} 
{even=[6, 8], odd=[5, 7, 9]} 
{even=[14, 16, 18], odd=[15, 17]} 
{even=[20, 22], odd=[19, 21, 23]} 
{even=[24, 26, 28], odd=[25, 27]} 

號已丟失,發生這種情況的一組隨機 數字。有人可以建議從下面的代碼中漏掉了什麼, 我該如何確保處理所有元素?

public static class Splitter implements FlatMapFunction<String, 
    Tuple3<String, String, List<String>>{ 
    private static final long serialVersionUID = 1L; 

    @Override 
    public void flatMap(String value, Collector<Tuple3<String, String, 
     List<String>>out) throws Exception { 
     String[] vals = value.split(":"); 

     if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){ 
      out.collect(new Tuple3<String, String, List<String>> 
      ("test","even", Arrays.asList(vals[1]))); 
     }else{ 
      out.collect(new Tuple3<String, String, List<String>> 
      ("test","odd", Arrays.asList(vals[1]))); 
     } 
    } 
} 


    DataStream<Map<String, List<String>>streamValue = 
    kafkaStream.flatMap(new Splitter()).keyBy(0) 
    .window(TumblingEventTimeWindows.of(Time.milliseconds(3000))). 
    trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2)) 
    .fold(new HashMap<String, List<String>>(), new 
    FoldFunction<Tuple3<String, String, List<String>>, Map<String, 
    List<String>>>() { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public Map<String, List<String>fold(Map<String, 
     List<String>accumulator, 
     Tuple3<String, String, List<String>value) throws 
     Exception { 
      if(accumulator.get(value.f1) != null){ 
       List<Stringlist = new ArrayList<>(); 
       list.addAll(accumulator.get(value.f1)); 
       list.addAll(value.f2); 
       accumulator.put(value.f1, list); 
      }else{ 
       accumulator.put(value.f1, value.f2); 
      } 
      return accumulator; 
     } 
    }); 

    streamValue.print(); 
    env.execute("window test"); 
} 


public class CustomizedCountTrigger<W extends Windowextends 
Trigger<Object, W{ 

    private static final long serialVersionUID = 1L; 
    private final long maxCount; 

    private final ReducingStateDescriptor<LongstateDesc = 
    new ReducingStateDescriptor<>("count", new Sum(), 
    LongSerializer.INSTANCE); 

    private CustomizedCountTrigger(long maxCount) { 
     this.maxCount = maxCount; 
    } 

    @Override 
    public TriggerResult onElement(Object element, long timestamp, W window, 
    TriggerContext ctx) throws Exception { 
     ReducingState<Longcount = ctx.getPartitionedState(stateDesc); 
     count.add(1L); 
     if (count.get() >= maxCount) { 
      count.clear(); 
      return TriggerResult.FIRE_AND_PURGE; 
     } 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(long time, W window, 

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) throws Exception { 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onEventTime(long time, W window, 

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) throws Exception { 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public void clear(W window, 
    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) 
    throws Exception { 
     ctx.getPartitionedState(stateDesc).clear(); 
    } 

    @Override 
    public String toString() { 
     return "CountTrigger(" + maxCount + ")"; 
    } 

    public static <W extends WindowCustomizedCountTrigger<Wof(long 
    maxCount) { 
     return new CustomizedCountTrigger<>(maxCount); 
    } 

    private static class Sum implements ReduceFunction<Long{ 
     private static final long serialVersionUID = 1L; 

     @Override 
     public Long reduce(Long value1, Long value2) throws Exception { 
      return value1 + value2; 
     } 

    } 
} 

回答

1

於是,我開始注意到您的自定義觸發使您正在使用的不相關的一個TumblingEventTime窗口排序之前其實寫這第一部分,但我想反正包括我原來的想法,因爲我不完全確定爲什麼你不使用EventTime窗口。我意識到這一點後的反應是低於原來的。

你是在單個並行還是多個上運行它?我之所以問,是因爲如果它是多重並行(並且kafka主題也是由多個分區組成),那麼消息可能以非連續的順序接收和處理。這可能導致帶有時間戳的消息導致水印前進,導致窗口處理消息。然後,下一個消息具有在當前水印時間之前的事件時間(a.k.a爲「遲到」)並且將導致該消息被丟棄。

因此,例如:如果你有20元和每一個的情況下,時間就像這樣:

MESSAGE1:EVENTTIME:1000個 MESSAGE1:EVENTTIME:2000 等等

以及活動時間窗口是5001ms。

現在消息message1到message9依次通過。這第一個窗口將被處理幷包含消息1-5(消息6將導致窗口被處理)。現在,如果message11在message10之前進入,它將導致包含消息6-9的窗口被處理。而當message10接下來時,水印已經超過了message10的事件時間,導致它被作爲「延遲事件」丟棄。

合適的回答

而不是使用EVENTTIME窗口和一個自定義觸發的,請嘗試使用countWindow。

所以替換此:

.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))). 
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2)) 

有了這個:

.countWindow(5L) 
+0

非常感謝您的時間和解釋。我同意使用eventTimeWindow並導致消息被丟棄。但我的用例如下所示。在此之前,我會澄清說,我試着使用並行(1)和並行(2),但問題保持不變,有些事件被丟棄。 – Sharath

+0

我的用例是當一個業務邏輯評​​估爲真時處理一組事件。例如,如果事件總數大於3或者偶數的事件總數大於5或預定義的時間窗被超過(例如2秒)。另外,如果你用我們自己的一個重寫窗口觸發我的理解,則實際觸發器將不再被考慮。在這種情況下,窗口的時間的流逝。 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); @Jicaar我感謝你的意見。 – Sharath

+0

包含自定義觸發器會覆蓋默認觸發器。但什麼觸發自定義觸發器仍然有效。因此,當3000毫秒的TumblingEventTime窗口完成後,它將觸發自定義觸發器中的自定義onEventTime方法。但你有onEventTime方法設置爲只繼續不火和/或清洗(而默認的觸發將返回FIRE_AND_PURGE),使得事件的時間窗口基本上是沒有意義的,從我可以告訴。 – Jicaar