2016-12-13 25 views
0

在Google數據流中使用來自Cloud PubSub訂閱的流式無界PCollection。我們正在使用這個技術來簡單地將事件傳遞給BigTable。與交付的一切都表現很好。雲數據流:水印前進時的副作用

我們的問題是,我們有下游批處理作業,希望在交付後從BigTable中讀取一天的數據。我想利用窗口化和觸發來實現一種副作用,當水印超過日期閾值時,會將標記行寫入bigtable中,表明數據流有理由相信大部分事件已經交付(我們不'對完整性需要強有力的保證,合理的保證),並且下游處理可以開始。

我們試過的是將原始事件寫入管道中的一個接收器,然後使用the timing information in the pane窗口進入另一個接收器,以確定水印是否已經提前。這種方法的問題在於它再次對原始事件進行操作,這是不希望的,因爲它會重複寫入事件行。我們可以防止這種寫入,但管道中的並行路徑仍然會在窗口事件流上運行。

有沒有一種有效的方法來將水印的回調附加到水印上,以便我們可以在水印前進時執行單個動作?

回答

1

設置在事件時間的定時器和接收回調的一般能力確實是一個重要的特徵請求日提交的作爲BEAM-27,這是正在積極開發。

但是實際上,您使用僅僅使用Dataflow Java SDK 1.x的功能來加入FixedWindows.of(Duration.standardDays(1))的方法似乎可以實現您的目標。您可以通過添加觸發器AfterPane.elementCountAtLeast(1)來維持「流水線」行爲,而不是分叉管道。它會產生GroupByKey的成本,但不會重複任何操作。

完整的管道可能是這樣的:

pipeline 
    // Read your data from Cloud Pubsub and parse to MyValue 
    .apply(PubsubIO.Read.topic(...).withCoder(MyValueCoder.of()) 

    // You'll need some keys 
    .apply(WithKeys.<MyKey, MyValue>of(...)) 

    // Window into daily windows, but still output as fast as possible 
    .apply(Window.into(FixedWindows.of(Duration.standardDays(1))) 
       .triggering(AfterPane.elementCountAtLeast(1))) 

    // GroupByKey adds the necessary EARLY/ON_TIME/LATE labeling 
    .apply(GroupByKey.<MyKey, MyValue>create()) 

    // Convert KV<MyKey, Iterable<MyValue>> 
    // to KV<ByteString, Iterable<Mutation>> 
    // where the iterable of mutations has the "end of day" marker if 
    // it was ON_TIME 
    .apply(MapElements.via(new MessageToMutationWithEndOfWindow()) 

    // Write it! 
    .apply(BigTableIO.Write.to(...); 

請做我的答案評論,如果我錯過了你的使用情況的一些細節。