我正在構建一個將在Google Cloud Dataflow中運行的Apache Beam(v2.0)管道。預期流程如下:Beam - 會話窗口無法按預期發送結果
- 來自Pub/Sub的事件流(無界數據源)。它們是簡單的JSON對象,具有
sessionId
屬性。 - 使用自定義的DoFn事件到
KV<String, String>
,其中他們的關鍵是sessionId
並且該值是整個JSON對象。 - 使用會話窗口的窗口事件(開發時間間隔爲2秒,生產時間約爲30分鐘)。
- 現在,只是打印從每個窗口
這裏發出的是管道碼結果:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO
.readStrings()
.fromSubscription(options.getSubscription()))
.apply("AddKeyFn", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new Gson();
String key = (String) gson.fromJson(c.element(), HashMap.class).get("sessionId");
KV<String, String> kv = KV.of(key, c.element());
c.output(kv);
}
}))
.apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardSeconds(2))))
.apply("PrintFn", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("****");
System.out.println(c.element());
System.out.println(c.timestamp());
}
}));
return pipeline.run();
我想窗函數每次會話結束髮射的結果,爲每個會話(基於密鑰)。出於測試目的,我使用pub/sub模擬器,並且只是隨機發送數據。
所以,舉例來說,如果下面的數據被髮送到發佈/訂閱:
{"sessionId": "2", "data": "data9583", "timestamp": 1507293681}
{"sessionId": "3", "data": "data5220", "timestamp": 1507293683}
{"sessionId": "6", "data": "data2998", "timestamp": 1507293684}
{"sessionId": "3", "data": "data3820", "timestamp": 1507293684}
{"sessionId": "6", "data": "data5728", "timestamp": 1507293685}
{"sessionId": "6", "data": "data7173", "timestamp": 1507293686}
{"sessionId": "4", "data": "data8800", "timestamp": 1507293687}
窗函數應該發出以下內容:
- 一號窗口:包含事件與
sessionId=2
- 第2個窗口:包含2個事件
sessionId=3
- 第3個窗口:包含3個事件
sessionId=6
- 第四窗口:包含1個事件與
sessionId=4
這裏的想法是:
- 的Windows只會發出一次會議是「完整」,這是說
{gapDuration}
以來已經過去了與去年的sessionId事件 - 每個窗口將包含一個單一的會話事件(因爲我們已經通過
KV<String, String>
到窗口功能)
上面的窗口函數直接從Beam documentation中拉出。
什麼我實際上看到的是:
- 每個事件都被擊中時發佈/訂閱立即打印,所以管道甚至沒有等待
{gapDuration}
發射窗口 - 每個打印語句包含單事件
值得注意的是,如果我添加自定義CombineFn
(它只是將JSON對象到JSON對象的數組),沒有什麼使得它到CombineFn
也沒有到PrintFn
(我在CombineFn
內添加了打印語句)。
我假設觸發與此有關,但似乎無法找到任何有用的東西來設置我在正確的方向(有一個令人驚訝的少量示例代碼爲梁,尤其是對於V2 .0
我的問題:
- 是我所期望的行爲可能
- 如果是這樣,我失去了什麼是這種方法至少在正確的軌道
- 如果任何人都可以點???我是一個很好的例子公司的來源de爲各種各樣的Beam管道用例,那會很棒!
資源我已經沖刷沒有成功:
- 以外的世界批次:流101 & 102
- "Complete" Examples from Beam Github
- Beam JavaDoc