1
我一直在嘗試使用flink窗口的示例,並驗證窗口的時間,我向流事件添加了時間戳。我發現窗口的持續時間少於窗口的長度。此外,如果我要使用滑動窗口並修改事件,則會將修改後的事件導入下一個窗口。窗口未完成其窗口長度
當我指定窗口長度時,是否不等待窗口完成?滑動窗口之間的重疊事件是指同一個實例? (我知道,流是不可改變的結構)
public class WindowDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties prop=PropertyLoader.loadPropertiesForConsumer("WC",0);
FlinkKafkaConsumer09<Alarm> consumer= new FlinkKafkaConsumer09<Alarm>("topic_smartEmse", new AlarmSchema(), prop);
DataStream<Alarm> inputStream= env.addSource(consumer);
inputStream= inputStream.flatMap(new FlatMapFunction<Alarm, Alarm>() {
@Override
public void flatMap(Alarm value, Collector<Alarm> out)
throws Exception {
System.out.println("flatMap Started at "+System.currentTimeMillis());
value.setUserDefined10("IN TIME "+System.currentTimeMillis());
out.collect(value);
System.out.println("flatMap Ended at "+System.currentTimeMillis());
}
});
KeyedStream<Alarm, String> keyedStream= inputStream.keyBy(new KeySelector<Alarm, String>(){
@Override
public String getKey(Alarm value) throws Exception {
System.out.println("getKey Started at "+System.currentTimeMillis());
return "XX";
}});
DataStream<Alarm> dataStream= keyedStream.timeWindow(Time.of(90, TimeUnit.SECONDS)).apply(new WindowFunction<Alarm, Alarm, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window,
Iterable<Alarm> input, Collector<Alarm> out)
throws Exception {
System.out.println("timeWindow Started at "+System.currentTimeMillis());
int count=0;
System.out.println("Key : "+key);
System.out.println("Values : "+input);
Iterator<Alarm> itr= input.iterator();
while (itr.hasNext()){
Alarm alarm= itr.next();
alarm.setUserDefined1(""+count++);
out.collect(alarm);
}
System.out.println("timeWindow ended at "+System.currentTimeMillis());
}
});
dataStream= dataStream.flatMap(new FlatMapFunction<Alarm, Alarm>() {
@Override
public void flatMap(Alarm value, Collector<Alarm> out)
throws Exception {
value.setUserDefined11("OUT TIME "+System.currentTimeMillis());
out.collect(value);
}
});
dataStream.printToErr();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
您已共享的代碼,對時間對齊進行手動修復。我想知道這是否應該通過隱式設置時間特性來完成(我已經嘗試過)。 –
這不是我的代碼。它來自flink源代碼:https://github.com/apache/flink/blob/release-1.1.4-rc1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/窗口/ assigners/TumblingProcessingTimeWindows.java – TobiSH
好的,這是有道理的。我還想知道一個(滑動)窗口所做的更改是否可以在其他窗口中看到? –