1
是否可以使用兩個鍵中存在的鍵來連接兩個單獨的PubSubIo無界PCollections?我嘗試用類似的方式完成任務:加入兩條流
閱讀(FistStream)&閱讀(SecondStream) - >拼合 - >生成用於加入的鍵 - >使用會話窗口將它們聚集在一起 - >按鍵組合,然後重新窗口化與固定大小的窗口 - > AvroIOWrite到磁盤使用窗口。
編輯:
這裏是我創建的管道代碼。我遇到兩個問題:
- 沒有獲取的寫入磁盤
- 管道開始真正不穩定 - 的某些步驟,它隨機減慢處理。特別是由...組合。即使使用10個數據流工作人員,也無法跟上攝取速度。
我需要每秒處理約10 000個會話。每個會話包含1或2個事件,然後需要關閉。
PubsubIO.Read<String> auctionFinishedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE)
.fromTopic("projects/authentic-genre-152513/topics/auction_finished");
PubsubIO.Read<String> auctionAcceptedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE)
.fromTopic("projects/authentic-genre-152513/topics/auction_accepted");
PCollection<String> auctionFinishedStream = p.apply("ReadAuctionFinished", auctionFinishedReader);
PCollection<String> auctionAcceptedStream = p.apply("ReadAuctionAccepted", auctionAcceptedReader);
PCollection<String> combinedEvents = PCollectionList.of(auctionFinishedStream)
.and(auctionAcceptedStream).apply(Flatten.pCollections());
PCollection<KV<String, String>> keyedAuctionFinishedStream = combinedEvents
.apply("AddKeysToAuctionFinished", WithKeys.of(new GenerateKeyForEvent()));
PCollection<KV<String, Iterable<String>>> sessions = keyedAuctionFinishedStream
.apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.apply(GroupByKey.create());
PCollection<SodaSession> values = sessions
.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, SodaSession>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
c.output(new SodaSession("auctionid", "stattedat"));
}
}));
PCollection<SodaSession> windowedEventStream = values
.apply("ApplyWindowing", Window.<SodaSession>into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))
))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
);
AvroIO.Write<SodaSession> avroWriter = AvroIO
.write(SodaSession.class)
.to("gs://storage/")
.withWindowedWrites()
.withFilenamePolicy(new EventsToGCS.PerWindowFiles("sessionsoda"))
.withNumShards(3);
windowedEventStream.apply("WriteToDisk", avroWriter);
您是否在https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java上看到了JoinExamples? – ravwojdyla
這聽起來像你的方法應該工作。什麼不起作用? –
我編輯了我的文章並提供了有關該問題的更多信息。是否有其他方式來加入流? –