2016-01-18 50 views
2

比方說,我有問題更新和用戶更新流。這些流包括每種類型的實體的「創建」消息。問題通過questioner_id與用戶相關。在流模式下使用數據流反規範化

典型的問題更新看起來像{qid:3 type:「Create」,questioner_id:5},{qid:3 type:「Comment」}。

典型用戶更新看起來像{UID:5類型: 「更新」 狀態: 「CA」},{UID:5類型: 「更新」 狀態: 「TX」}

而且我想最後question_facts數據集看起來像{ts:x qid:3 comments:1 user_state:「TX」},其中包含每個問題更新和發生問題「創建」事件後發生的用戶更新的條目。

要做到這一點的方式是有兩個PCollections,一個窗口在所有時間,然後CoGroupByKey?

回答

3

如果您的管道輸入是固定的數據集,那麼您建議的方法將起作用。

如果您有更新流並想要輸出結果流,則需要確切地確定何時需要輸出。例如,通過在輸入窗口中設置AfterPane.elementCountAtLeast(1)的觸發器,每次發生更新時都可以輸出。

+0

聽起來像我們想要的。輸入是流,我們希望每次更新任何一個源時都記錄一條新記錄。我會編輯我的問題。乾杯 – bfabry