2017-10-06 38 views
2

我正在構建一個將在Google Cloud Dataflow中運行的Apache Beam(v2.0)管道。預期流程如下:Beam - 會話窗口無法按預期發送結果

  • 來自Pub/Sub的事件流(無界數據源)。它們是簡單的JSON對象,具有sessionId屬性。
  • 使用自定義的DoFn事件到KV<String, String>,其中他們的關鍵是sessionId並且該值是整個JSON對象。
  • 使用會話窗口的窗口事件(開發時間間隔爲2秒,生產時間約爲30分鐘)。
  • 現在,只是打印從每個窗口

這裏發出的是管道碼結果:

Pipeline pipeline = Pipeline.create(options); 

    pipeline.apply(PubsubIO 
        .readStrings() 
        .fromSubscription(options.getSubscription())) 

     .apply("AddKeyFn", ParDo.of(new DoFn<String, KV<String, String>>() { 
      @ProcessElement 
      public void processElement(ProcessContext c) { 
       Gson gson = new Gson(); 
       String key = (String) gson.fromJson(c.element(), HashMap.class).get("sessionId"); 
       KV<String, String> kv = KV.of(key, c.element()); 
       c.output(kv); 
      } 
      })) 


     .apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardSeconds(2)))) 

     .apply("PrintFn", ParDo.of(new DoFn<KV<String, String>, Void>() { 
      @ProcessElement 
      public void processElement(ProcessContext c) { 
       System.out.println("****"); 
       System.out.println(c.element()); 
       System.out.println(c.timestamp()); 
      } 
      })); 

     return pipeline.run(); 

我想窗函數每次會話結束髮射的結果,爲每個會話(基於密鑰)。出於測試目的,我使用pub/sub模擬器,並且只是隨機發送數據。

所以,舉例來說,如果下面的數據被髮送到發佈/訂閱:

{"sessionId": "2", "data": "data9583", "timestamp": 1507293681} 
{"sessionId": "3", "data": "data5220", "timestamp": 1507293683} 
{"sessionId": "6", "data": "data2998", "timestamp": 1507293684} 
{"sessionId": "3", "data": "data3820", "timestamp": 1507293684} 
{"sessionId": "6", "data": "data5728", "timestamp": 1507293685} 
{"sessionId": "6", "data": "data7173", "timestamp": 1507293686} 
{"sessionId": "4", "data": "data8800", "timestamp": 1507293687} 

窗函數應該發出以下內容:

  • 一號窗口:包含事件與sessionId=2
  • 第2個窗口:包含2個事件sessionId=3
  • 第3個窗口:包含3個事件sessionId=6
  • 第四窗口:包含1個事件與sessionId=4

這裏的想法是:

  • 的Windows只會發出一次會議是「完整」,這是說{gapDuration}以來已經過去了與去年的sessionId事件
  • 每個窗口將包含一個單一的會話事件(因爲我們已經通過KV<String, String>到窗口功能)

上面的窗口函數直接從Beam documentation中拉出。

什麼我實際上看到的是:

  • 每個事件都被擊中時發佈/訂閱立即打印,所以管道甚至沒有等待{gapDuration}發射窗口
  • 每個打印語句包含單事件

值得注意的是,如果我添加自定義CombineFn(它只是將JSON對象到JSON對象的數組),沒有什麼使得它到CombineFn也沒有PrintFn(我在CombineFn內添加了打印語句)。

我假設觸發與此有關,但似乎無法找到任何有用的東西來設置我在正確的方向(有一個令人驚訝的少量示例代碼爲梁,尤其是對於V2 .0

我的問題:

  • 是我所期望的行爲可能
  • 如果是這樣,我失去了什麼是這種方法至少在正確的軌道
  • 如果任何人都可以點???我是一個很好的例子公司的來源de爲各種各樣的Beam管道用例,那會很棒!

資源我已經沖刷沒有成功:

回答

1

首先,窗口功能需要在窗口r之間合併元素要求應用聚合操作,例如GroupByKey或Combine。這在波束編程指南Windowing Basics下討論。

其次,默認情況下(如您使用的)PubSub會根據發佈時間將時間戳分配給元素。由於您有明確的時間戳字段,因此您可能需要考慮使用timestamp屬性發布這些元素,並使用withTimestampAttribute method讀取它們。這將使用您發佈的timestamp屬性作爲時間戳。