2017-07-17 34 views
0

我試圖通過延時最多30秒的分鐘時間戳來加密60秒數據。Apache Flink。加水印的開窗

​​

我正在接收數據。水印和時間戳正在設置中。看起來,彙總的數據永遠不會發送到ohlcStreamAggregated,因此它們不會被記錄。

public TimestampExtractor(Time maxDelayInterval) { 
     if (maxDelayInterval.toMilliseconds() < 0) { 
      throw new RuntimeException("This parameter must be positive or 0.); 
     } 
     this.maxDelayInterval = maxDelayInterval.toMilliseconds()/1000; 
     this.currentMaxTimestamp = Long.MIN_VALUE + this.maxDelayInterval; 
    } 

@Override 
public final Watermark getCurrentWatermark() { 
     // set maximum delay 30 seconds 
     long potentialWM = currentMaxTimestamp - maxDelayInterval; 
     if (potentialWM > lastEmittedWM) { 
      lastEmittedWM = potentialWM; 
     } 
     return new Watermark(lastEmittedWM); 
    } 
@Override 
public final long extractTimestamp(StockTrade stockTrade, long previousElementTimestamp) { 
     BigDecimal bd = new BigDecimal(stockTrade.getTime()); 
     long timestamp = bd.longValue(); 
     //set the maximum seen timestamp so far 
     if (timestamp > currentMaxTimestamp) { 
      currentMaxTimestamp = timestamp; 
     } 
     return timestamp; 
    } 

我用this example作爲模板。

回答

0

它會更容易診斷您的應用程序,如果你可以分享整個事情(也許在一個要點),但是,你有沒有:

  • 設定的時間特性事件時間(docs)?
  • 調用在流執行環境中執行?

此外,您的時間戳提取可能會更簡單一些。更多類似這樣的:

public static class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<StockTrade> { 
    public TimestampExtractor() { 
     super(Time.seconds(30)); 
    } 

    @Override 
    public long extractTimestamp(StockTrade trade) { 
     return trade.getTime(); 
    } 
} 
+0

是的,我設置了env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);和env.execute(「從卡夫卡話題交易」);由於第一個採集流程正常,日誌記錄等,我認爲問題在於窗口。如果您可以向我發送電子郵件,我們可以進一步討論。多謝。 –