2016-05-19 73 views
2

我有一個可能損壞/惡意數據的數據集。數據是時間戳。我使用啓發式功能對數據進行評級。經過一段時間後,我知道所有帶有一些ID的新數據項都需要被丟棄,並且它們代表了很大一部分數據(高達40%)。實時流水線反饋迴路

現在我有兩個批次的管道:

  1. 第一個只是運行評級過來的數據。
  2. 第二個過濾掉損壞的數據並運行分析。

我想從批處理模式(比如說,每天運行)切換到在線處理模式(希望得到延遲< 10分鐘)。

第二個管道使用一個全局窗口,使處理變得簡單。當檢測到損壞的數據密鑰時,所有其他記錄都被簡單地丟棄(同樣使用前幾天丟棄的密鑰,因爲預過濾器很容易)。此外,它可以更輕鬆地對輸出數據做出決定,因爲在處理期間,給定密鑰的所有歷史數據都可用。

的主要問題是:我可以創建一個數據流DAG中的循環?假設我想累積給予我處理的每個會話窗口的質量比率,並且如果速率總和超過X,則管道的早期階段中的某些過濾器函數應過濾掉惡意密鑰。

我知道側面輸入,我不知道它是否可以在運行時更改。

我知道,DAG定義不能有周期,但是沒有它是如何實現同樣的結果?這在我腦海

想法是使用側輸出標記ID爲惡意造假無界的輸出/輸入。輸出會將數據轉儲到某個存儲,並且輸入將每小時加載一次,因此可以連接它。

+0

一個相關的問題:http://stackoverflow.com/questions/36927558/how-to-combine-streaming-data-with-large-history-data-set-in-dataflow-beam –

回答

1

在梁編程模型are windowed側面輸入。

所以你走的是正確的道路:將流水線結構化爲兩部分似乎是合理的:1)計算惡意數據的檢測模型,2)將模型作爲側輸入,並將數據作爲主要輸入,並根據模型過濾數據。管道的第二部分將獲得匹配窗口的模型,這看起來正是你想要的。

在實際上,這是在Millwheel paper(第2頁)的主要實施例中,在其數據流的流道是基於一個。