2017-07-17 95 views
0

我無法選擇適當的窗口功能/分配器。任務如下。首先,我從具有request_id和一些數據的Source獲取數據,並對外部數據庫執行異步請求。Apache Flink異步請求和窗口

// Here String is for request_id, Data is for treated data 
DataStream Tuple2<String, Data> stream = ... 

// async I/O queries 
DataStream<Tuple2<String, String>> resultStream = 
AsyncDataStream.unorderedWait(
    stream, 
    new AsyncDatabaseRequest(), 
    1000, 
    TimeUnit.MILLISECONDS, 
    100 
); 

現在我想通過request_id收集所有數據並進行一些計算。

DataStream Tuple2<String, Integer> = result 
    .map(val -> new Tuple2<String, Integer>(val.f0, val.f1.data_int)) 
    .keyBy(0) 
    .window(...) 
    .sum(1); 

問題是窗口函數。我需要每個窗口包含具有相同request_id的所有數據點,但異步查詢的時間可能從毫秒到分鐘不等。另一方面,我需要低延遲,所以我不能使用ProcessingTimeSessionWindows.withGap(Time.minutes(10))。我需要在從異步函數獲取最後一個數據後立即執行計算。

對我來說最好的方法是使用異步函數的窗口水印,當然每個查詢都會完成,以及它如何指向它。這是可能的,這樣的任務的最佳做法是什麼?

回答

0

那麼,我找到了解決方案,它似乎很容易。 我只使用EventTime。在我的源功能我產生事件時間戳以及水印如下:

Long ts = System.currentTimeMillis(); 
ctx.collectWithTimestamp(data, ts); 
ctx.emitWatermark(new Watermark(ts + 1)); 

在河流流量我用EVENTTIME功能:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
DataStream<...> dataStream = ...; 

DataStream<...> newStream = dataStream 
    .keyBy(0) 
    .timeWindow(Time.milliseconds(1)) 
    .reduce(new Reducer()); 

這樣,我避免超時,結果立即準備。