0
我試圖通過延時最多30秒的分鐘時間戳來加密60秒數據。Apache Flink。加水印的開窗
我正在接收數據。水印和時間戳正在設置中。看起來,彙總的數據永遠不會發送到ohlcStreamAggregated,因此它們不會被記錄。
public TimestampExtractor(Time maxDelayInterval) {
if (maxDelayInterval.toMilliseconds() < 0) {
throw new RuntimeException("This parameter must be positive or 0.);
}
this.maxDelayInterval = maxDelayInterval.toMilliseconds()/1000;
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxDelayInterval;
}
@Override
public final Watermark getCurrentWatermark() {
// set maximum delay 30 seconds
long potentialWM = currentMaxTimestamp - maxDelayInterval;
if (potentialWM > lastEmittedWM) {
lastEmittedWM = potentialWM;
}
return new Watermark(lastEmittedWM);
}
@Override
public final long extractTimestamp(StockTrade stockTrade, long previousElementTimestamp) {
BigDecimal bd = new BigDecimal(stockTrade.getTime());
long timestamp = bd.longValue();
//set the maximum seen timestamp so far
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
我用this example作爲模板。
是的,我設置了env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);和env.execute(「從卡夫卡話題交易」);由於第一個採集流程正常,日誌記錄等,我認爲問題在於窗口。如果您可以向我發送電子郵件,我們可以進一步討論。多謝。 –