我正在處理一個流式數據流管道,它使用來自PubSub的批量項目消息並最終將它們寫入數據存儲。爲了獲得更好的並行性,並且爲了及時確認從PubSub提取的消息,我將這些批次拆分爲單個項目並在其後添加一個融合破解器。爲什麼我的聚變斷路器會丟失或阻止數據?
所以管道看起來像這樣...
PubSubIO - >反序列化 - >解包 - >融合突破 - >驗證/轉換 - > DatastoreIO。
這裏是我的熔合斷路器,主要是從the JdbcIO class複製。它使用觸發器來分解全局窗口中的數據。
public class BreakFusionTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply(ParDo.of(new RandomKeyFn<T>()))
.apply(Window.<KV<Integer, T>>triggering(
Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2L))))
.discardingFiredPanes())
.apply(GroupByKey.<Integer, T>create())
.apply(Values.<Iterable<T>>create())
.apply(Flatten.<T>iterables());
}
private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private Random random;
@Setup
public void setup() {
random = new Random();
}
@ProcessElement
public void processElement(ProcessContext context) {
context.output(KV.of(random.nextInt(), context.element()));
}
}
}
它工作的大部分時間,但在多個場合,當它產生更少的輸出大於輸入數數,流輸入完成後,即使與管道十分鐘進入空閒狀態。
如下面的數據流作業監視控制檯所示。在我等待大約10分鐘的數據出現轉換後,屏幕截圖是在作業完成後進行的。
*有人能想到爲一個解釋?感覺好像融合破壞者已經退縮或失去了一些物品。 *
我注意到它只發生在數據量/數據速率很高時,迫使管道在測試運行的中間放大,從25人增加到50人n1-highmem-2工人。但是,我還沒有做足夠的測試來驗證放大是否是重現此問題的關鍵。
或者也許觸發器每隔兩秒就會過火一次?
我正在使用Dataflow 2.0.0-beta1。工作ID是「2017-02-23_23_15_34-14025424484787508627」。