2017-02-24 53 views
0

我正在處理一個流式數據流管道,它使用來自PubSub的批量項目消息並最終將它們寫入數據存儲。爲了獲得更好的並行性,並且爲了及時確認從PubSub提取的消息,我將這些批次拆分爲單個項目並在其後添加一個融合破解器。爲什麼我的聚變斷路器會丟失或阻止數據?

所以管道看起來像這樣...

PubSubIO - >反序列化 - >解包 - >融合突破 - >驗證/轉換 - > DatastoreIO。

這裏是我的熔合斷路器,主要是從the JdbcIO class複製。它使用觸發器來分解全局窗口中的數據。

public class BreakFusionTransform<T> extends PTransform<PCollection<T>, PCollection<T>> { 

    @Override 
    public PCollection<T> expand(PCollection<T> input) { 
    return input 
     .apply(ParDo.of(new RandomKeyFn<T>())) 
     .apply(Window.<KV<Integer, T>>triggering(
      Repeatedly.forever(
       AfterProcessingTime 
        .pastFirstElementInPane() 
        .plusDelayOf(Duration.standardSeconds(2L)))) 
      .discardingFiredPanes()) 
     .apply(GroupByKey.<Integer, T>create()) 
     .apply(Values.<Iterable<T>>create()) 
     .apply(Flatten.<T>iterables()); 
    } 

    private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> { 
    private Random random; 

    @Setup 
    public void setup() { 
     random = new Random(); 
    } 

    @ProcessElement 
    public void processElement(ProcessContext context) { 
     context.output(KV.of(random.nextInt(), context.element())); 
    } 
    } 
} 

它工作的大部分時間,但在多個場合,當它產生更少的輸出大於輸入數數,流輸入完成後,即使與管道十分鐘進入空閒狀態。

如下面的數據流作業監視控制檯所示。在我等待大約10分鐘的數據出現轉換後,屏幕截圖是在作業完成後進行的。

enter image description here

*有人能想到爲一個解釋?感覺好像融合破壞者已經退縮或失去了一些物品。 *

我注意到它只發生在數據量/數據速率很高時,迫使管道在測試運行的中間放大,從25人增加到50人n1-highmem-2工人。但是,我還沒有做足夠的測試來驗證放大是否是重現此問題的關鍵。

或者也許觸發器每隔兩秒就會過火一次?

我正在使用Dataflow 2.0.0-beta1。工作ID是「2017-02-23_23_15_34-14025424484787508627」。

回答

1

Streaming Dataflow中的計數器是盡力而爲的措施;特別是自動縮放會導致更大的差異。在這種情況下,管道不應該丟失數據。

相關問題