0
我正在使用X大小和Y週期的滑動時間窗口。爲了標記每個窗口的輸出,我想獲得當前PCollection窗口的時間戳。如何獲取當前滑動窗口的最大時間戳
PCollection<T> windowedInput = input
.apply(Window<T>into(
SlidingWindows.of(Duration.standardMinutes(10))
.every(Duration.standardMinutes(1))));
// Extract key from each input and run a function per group.
//
// Q: ExtractKey() depends on the window triggered time.
// How can I pass the timestamp of windowedInputs to ExtractKey()?
PCollection<KV<K, Iterable<T>>> groupedInputs = windowedInputs
.apply(ParDo.of(new ExtractKey()))
.apply(GroupByKey.<K, Ts>create());
// Run Story clustering and write outputs.
//
// Q: Also I'd like to add a window timestamp suffix to the output.
// How can I pass (or get) the timestamp to SomeDoFn()?
PCollection<String> results = groupedInputs.apply(ParDo.of(new SomeDoFn()));
感謝您的建議。我會嘗試。是否有任何類似的方式提供窗口信息到代碼片段中的ExtractKey()? – user7101240
當然 - 只需將BoundedWindow參數添加到ExtractKey的ProcessElement方法中即可。 – jkff