當我嘗試使用窗口和摺疊功能聚合元素時,元素的某些 從獲取聚合中錯過。使用來自卡夫卡(value:0, value:1, value:2, value:3)
的元素 ,並將它們合併爲奇數和偶數值 。使用Flink窗口和摺疊功能,元素缺失?
輸出爲:10-13之間
{even=[0, 2, 4], odd=[1, 3]}
{even=[6, 8], odd=[5, 7, 9]}
{even=[14, 16, 18], odd=[15, 17]}
{even=[20, 22], odd=[19, 21, 23]}
{even=[24, 26, 28], odd=[25, 27]}
號已丟失,發生這種情況的一組隨機 數字。有人可以建議從下面的代碼中漏掉了什麼, 我該如何確保處理所有元素?
public static class Splitter implements FlatMapFunction<String,
Tuple3<String, String, List<String>>{
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple3<String, String,
List<String>>out) throws Exception {
String[] vals = value.split(":");
if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){
out.collect(new Tuple3<String, String, List<String>>
("test","even", Arrays.asList(vals[1])));
}else{
out.collect(new Tuple3<String, String, List<String>>
("test","odd", Arrays.asList(vals[1])));
}
}
}
DataStream<Map<String, List<String>>streamValue =
kafkaStream.flatMap(new Splitter()).keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))
.fold(new HashMap<String, List<String>>(), new
FoldFunction<Tuple3<String, String, List<String>>, Map<String,
List<String>>>() {
private static final long serialVersionUID = 1L;
@Override
public Map<String, List<String>fold(Map<String,
List<String>accumulator,
Tuple3<String, String, List<String>value) throws
Exception {
if(accumulator.get(value.f1) != null){
List<Stringlist = new ArrayList<>();
list.addAll(accumulator.get(value.f1));
list.addAll(value.f2);
accumulator.put(value.f1, list);
}else{
accumulator.put(value.f1, value.f2);
}
return accumulator;
}
});
streamValue.print();
env.execute("window test");
}
public class CustomizedCountTrigger<W extends Windowextends
Trigger<Object, W{
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<LongstateDesc =
new ReducingStateDescriptor<>("count", new Sum(),
LongSerializer.INSTANCE);
private CustomizedCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window,
TriggerContext ctx) throws Exception {
ReducingState<Longcount = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx)
throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
public static <W extends WindowCustomizedCountTrigger<Wof(long
maxCount) {
return new CustomizedCountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long{
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
非常感謝您的時間和解釋。我同意使用eventTimeWindow並導致消息被丟棄。但我的用例如下所示。在此之前,我會澄清說,我試着使用並行(1)和並行(2),但問題保持不變,有些事件被丟棄。 – Sharath
我的用例是當一個業務邏輯評估爲真時處理一組事件。例如,如果事件總數大於3或者偶數的事件總數大於5或預定義的時間窗被超過(例如2秒)。另外,如果你用我們自己的一個重寫窗口觸發我的理解,則實際觸發器將不再被考慮。在這種情況下,窗口的時間的流逝。 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); @Jicaar我感謝你的意見。 – Sharath
包含自定義觸發器會覆蓋默認觸發器。但什麼觸發自定義觸發器仍然有效。因此,當3000毫秒的TumblingEventTime窗口完成後,它將觸發自定義觸發器中的自定義onEventTime方法。但你有onEventTime方法設置爲只繼續不火和/或清洗(而默認的觸發將返回FIRE_AND_PURGE),使得事件的時間窗口基本上是沒有意義的,從我可以告訴。 – Jicaar