2017-06-05 30 views
1

是否可以使用兩個鍵中存在的鍵來連接兩個單獨的PubSubIo無界PCollections?我嘗試用類似的方式完成任務:加入兩條流

閱讀(FistStream)&閱讀(SecondStream) - >拼合 - >生成用於加入的鍵 - >使用會話窗口將它們聚集在一起 - >按鍵組合,然後重新窗口化與固定大小的窗口 - > AvroIOWrite到磁盤使用窗口。

編輯:

這裏是我創建的管道代碼。我遇到兩個問題:

  1. 沒有獲取的寫入磁盤
  2. 管道開始真正不穩定 - 的某些步驟,它隨機減慢處理。特別是由...組合。即使使用10個數據流工作人員,也無法跟上攝取速度。

我需要每秒處理約10 000個會話。每個會話包含1或2個事件,然後需要關閉。

PubsubIO.Read<String> auctionFinishedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE) 
      .fromTopic("projects/authentic-genre-152513/topics/auction_finished"); 
    PubsubIO.Read<String> auctionAcceptedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE) 
      .fromTopic("projects/authentic-genre-152513/topics/auction_accepted"); 

    PCollection<String> auctionFinishedStream = p.apply("ReadAuctionFinished", auctionFinishedReader); 
    PCollection<String> auctionAcceptedStream = p.apply("ReadAuctionAccepted", auctionAcceptedReader); 

    PCollection<String> combinedEvents = PCollectionList.of(auctionFinishedStream) 
      .and(auctionAcceptedStream).apply(Flatten.pCollections()); 

    PCollection<KV<String, String>> keyedAuctionFinishedStream = combinedEvents 
      .apply("AddKeysToAuctionFinished", WithKeys.of(new GenerateKeyForEvent())); 

    PCollection<KV<String, Iterable<String>>> sessions = keyedAuctionFinishedStream 
      .apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardMinutes(1))) 
              .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) 
      .apply(GroupByKey.create()); 

    PCollection<SodaSession> values = sessions 
      .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, SodaSession>() { 
       @ProcessElement 
       public void processElement(ProcessContext c, BoundedWindow window) { 
        c.output(new SodaSession("auctionid", "stattedat")); 
       } 

    })); 

    PCollection<SodaSession> windowedEventStream = values 
      .apply("ApplyWindowing", Window.<SodaSession>into(FixedWindows.of(Duration.standardMinutes(2))) 
        .triggering(Repeatedly.forever(
          AfterProcessingTime.pastFirstElementInPane() 
            .plusDelayOf(Duration.standardMinutes(1)) 
        )) 
        .withAllowedLateness(Duration.ZERO) 
        .discardingFiredPanes() 
      ); 

    AvroIO.Write<SodaSession> avroWriter = AvroIO 
      .write(SodaSession.class) 
      .to("gs://storage/") 
      .withWindowedWrites() 
      .withFilenamePolicy(new EventsToGCS.PerWindowFiles("sessionsoda")) 
      .withNumShards(3); 

    windowedEventStream.apply("WriteToDisk", avroWriter); 
+0

您是否在https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java上看到了JoinExamples? – ravwojdyla

+0

這聽起來像你的方法應該工作。什麼不起作用? –

+0

我編輯了我的文章並提供了有關該問題的更多信息。是否有其他方式來加入流? –

回答

1

我找到了一個有效的解決方案。由於我的收藏之一與其他收藏不相稱,所以我使用側面輸入來加速分組操作。以下是我的解決方案的概述:

  1. 讀取兩個事件流。
  2. 將它們拼合成單個PCollection。
  3. 使用滑動窗口大小(可關閉會話持續時間+會話最大長度,每個可關閉的會話持續時間)。
  4. 再次分區收集。
  5. 從較小的PCollection創建PCollectionView。
  6. 將sideInput與前一步中創建的視圖合併到兩個流中。
  7. 將會話寫入磁盤。

它處理連接4000個事件/秒數據流(較大的一個)+260個事件/秒數據流在1-2個DataFlow工作人員與~15個工作人員使用會話窗口和GroupBy。