我們已經解決了我們在AWS(亞馬遜網絡服務)中運行的工作流的這個問題,用於存儲在S3中的數據。
我們的設置:
- 數據存儲:AWS S3
- 數據攝取機制:水槽
- 工作流管理:Oozie的
- 存儲的文件狀態:MySQL的
問題:
我們使用Flume將數據導入Amazon S3。所有攝入的數據都在同一個文件夾中(S3是一個鍵/值存儲區,沒有文件夾概念,這裏的文件夾意味着所有的數據都有相同的前綴,例如/tmp/1.txt,/tmp/2.txt等。/ tmp /是關鍵字前綴)。
我們有一個ETL工作流程,計劃在一個小時內運行一次。但是,由於所有數據都被攝入了相同的文件夾,因此我們必須區分處理的和未處理的文件。
例如,對於攝取第1小時的數據是:
/tmp/1.txt
/tmp/2.txt
當工作流首次啓動時,它應該從「1.txt的」和「2.txt」處理數據,並將其標記爲加工。
如果第二小時,食入的數據是:
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt
然後,在文件夾中的總數據後2小時將是:
/tmp/1.txt
/tmp/2.txt
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt
由於 「1.txt的」和「2.txt」已經被處理並標記爲已處理,在第二次運行期間,作業應該只處理「3.txt」,「4.txt」和「5.txt」。
解決方案:
我們開發的庫(我們稱其爲FileManager
),用於管理處理的文件列表。我們將這個庫插入到Oozie工作流中,作爲Java操作。這是工作流程的第一步。
該庫還處理忽略Flume當前正在寫入的文件。當Flume將數據寫入文件時,這些文件具有「_current」後綴。所以,這些文件被忽略處理,直到它們完全寫入。
攝入的文件是以時間戳作爲後綴生成的。對於例如「hourly_feed.1234567」。所以,文件名是按照他們創建的升序排列的。
爲了獲取未處理文件的列表,我們使用了S3的使用標記進行查詢的功能(例如,如果文件夾中有10,000個文件,如果您將標記指定爲第5,000個文件的名稱,那麼S3會返回給您文件從5001到10,000)。
我們有以下的每個文件的3種狀態:
- 成功 - 文件,這是成功處理
- 錯誤 - 文件,這被拾起進行處理,但有一個錯誤在處理這些文件。因此,需要進行處理
- IN_PROGRESS重新拾起這些文件 - 文件已經拿起了處理,目前正由一個工作
對每個文件進行處理,我們存儲以下詳細MySQL數據庫:
- 文件名
- 最後修改時間 - 我們用這個來處理一些極端情況的文件
- 狀態(IN_PROGRESS,成功,錯誤)
的FileManager
暴露的以下接口:
GetLatestFiles
:返回的最新未處理的文件
UpdateFileStatus
名單:處理後文件,更新文件的狀態
以下是跟着來識別文件的步驟,而尚未處理:
- 查詢數據庫(MySQL的),檢查最後文件,其中有成功(查詢狀態:
order by created desc
)。
- 如果第一步驟返回一個文件,然後用設置爲最後成功處理的文件的文件標記查詢S3。這將返回上次成功處理文件後攝入的所有文件。
- 也查詢數據庫,以檢查是否有錯誤狀態的任何文件。這些文件需要重新處理,因爲以前的工作流程沒有成功處理它們。
- 返回從步驟2和3所獲得的文件列表(返回它們之前,標記爲IN_PROGRESS他們的狀態)。
- 作業後成功完成更新所有處理過的文件的狀態成功。如果是在處理文件錯誤,然後更新爲錯誤所有文件的狀態(這樣他們可以撿起來處理接下來的時間)
我們使用Oozie的工作流管理。 Oozie的工作流有以下步驟:
- 步驟1:取下一組文件要被處理時,標記每個它們的狀態的作爲IN_PROGRESS,並將它們傳遞到下一階段
- 步驟2:處理的文件
- 步驟3:更新的處理的狀態(SUCCESS或ERROR)
德duplicat離子: 當你實現這樣的庫,有重複的記錄的可能性(在某些角落情況下,同一個文件可以被用於處理拿起兩次)。我們實施了一個去重複邏輯來刪除重複的記錄。
我們正好解決同樣的問題在AWS S3我們的Hadoop作業。你的文件系統是HDFS嗎?如果是,那麼識別和只讀取上次未處理的文件可能並不那麼容易。 –
我的輸入來自NOSQL數據庫而非HDFS。您能否解釋一下您的方法?我主要關心的是在作業運行時添加的那些記錄。我不知道他們會發生什麼。他們是否在目前的工作和下一份工作中得到處理,或者兩者兼得? –
我已經使用我們在S3中爲我們的工作流實施的解決方案更新了答案。 –