2015-12-01 100 views
3

我正在開發一個計劃每天運行一次的hadoop程序。它需要一堆json文檔,每個文檔都有一個時間戳,顯示何時添加文檔。我的程序只能處理自上次運行以來添加的那些文檔。所以,我需要保持一個狀態,該狀態是顯示我的最後一次操作hadoop作業的時間戳。我正在考慮將此狀態存儲在SQL Server中,並在我的作業的驅動程序中查詢該狀態。這是一個好的解決方案還是更好的解決方案?如何在Hadoop作業中保持狀態?

p.s.我的hadoop作業正在HDInsight上運行。話雖如此,它仍然可以從我的驅動程序查詢SQL服務器?

+0

我們正好解決同樣的問題在AWS S3我們的Hadoop作業。你的文件系統是HDFS嗎?如果是,那麼識別和只讀取上次未處理的文件可能並不那麼容易。 –

+0

我的輸入來自NOSQL數據庫而非HDFS。您能否解釋一下您的方法?我主要關心的是在作業運行時添加的那些記錄。我不知道他們會發生什麼。他們是否在目前的工作和下一份工作中得到處理,或者兩者兼得? –

+0

我已經使用我們在S3中爲我們的工作流實施的解決方案更新了答案。 –

回答

1

我們已經解決了我們在AWS(亞馬遜網絡服務)中運行的工作流的這個問題,用於存儲在S3中的數據。

我們的設置:

  • 數據存儲:AWS S3
  • 數據攝取機制:水槽
  • 工作流管理:Oozie的
  • 存儲的文件狀態:MySQL的

問題:

我們使用Flume將數據導入Amazon S3。所有攝入的數據都在同一個文件夾中(S3是一個鍵/值存儲區,沒有文件夾概念,這裏的文件夾意味着所有的數據都有相同的前綴,例如/tmp/1.txt,/tmp/2.txt等。/ tmp /是關鍵字前綴)。

我們有一個ETL工作流程,計劃在一個小時內運行一次。但是,由於所有數據都被攝入了相同的文件夾,因此我們必須區分處理的未處理的文件。

例如,對於攝取第1小時的數據是:

/tmp/1.txt 
/tmp/2.txt 

當工作流首次啓動時,它應該從「1.txt的」和「2.txt」處理數據,並將其標記爲加工

如果第二小時,食入的數據是:

/tmp/3.txt 
/tmp/4.txt 
/tmp/5.txt 

然後,在文件夾中的總數據後2小時將是:

/tmp/1.txt 
/tmp/2.txt 
/tmp/3.txt 
/tmp/4.txt 
/tmp/5.txt 

由於 「1.txt的」和「2.txt」已經被處理並標記爲已處理,在第二次運行期間,作業應該只處理「3.txt」,「4.txt」和「5.txt」。

解決方案:

我們開發的庫(我們稱其爲FileManager),用於管理處理的文件列表。我們將這個庫插入到Oozie工作流中,作爲Java操作。這是工作流程的第一步。

該庫還處理忽略Flume當前正在寫入的文件。當Flume將數據寫入文件時,這些文件具有「_current」後綴。所以,這些文件被忽略處理,直到它們完全寫入。

攝入的文件是以時間戳作爲後綴生成的。對於例如「hourly_feed.1234567」。所以,文件名是按照他們創建的升序排列的。

爲了獲取未處理文件的列表,我們使用了S3的使用標記進行查詢的功能(例如,如果文件夾中有10,000個文件,如果您將標記指定爲第5,000個文件的名稱,那麼S3會返回給您文件從5001到10,000)。

我們有以下的每個文件的3種狀態:

  1. 成功 - 文件,這是成功處理
  2. 錯誤 - 文件,這被拾起進行處理,但有一個錯誤在處理這些文件。因此,需要進行處理
  3. IN_PROGRESS重新拾起這些文件 - 文件已經拿起了處理,目前正由一個工作

對每個文件進行處理,我們存儲以下詳細MySQL數據庫:

  • 文件名
  • 最後修改時間 - 我們用這個來處理一些極端情況的文件
  • 狀態(IN_PROGRESS成功錯誤

FileManager暴露的以下接口:

  • GetLatestFiles:返回的最新未處理的文件
  • UpdateFileStatus名單:處理後文件,更新文件的狀態

以下是跟着來識別文件的步驟,而尚未處理:

  1. 查詢數據庫(MySQL的),檢查最後文件,其中有成功(查詢狀態: order by created desc)。
  2. 如果第一步驟返回一個文件,然後用設置爲最後成功處理的文件的文件標記查詢S3。這將返回上次成功處理文件後攝入的所有文件。
  3. 也查詢數據庫,以檢查是否有錯誤狀態的任何文件。這些文件需要重新處理,因爲以前的工作流程沒有成功處理它們。
  4. 返回從步驟2和3所獲得的文件列表(返回它們之前,標記爲IN_PROGRESS他們的狀態)。
  5. 作業後成功完成更新所有處理過的文件的狀態成功。如果是在處理文件錯誤,然後更新爲錯誤所有文件的狀態(這樣他們可以撿起來處理接下來的時間)

我們使用Oozie的工作流管理。 Oozie的工作流有以下步驟:

  1. 步驟1:取下一組文件要被處理時,標記每個它們的狀態的作爲IN_PROGRESS,並將它們傳遞到下一階段
  2. 步驟2:處理的文件
  3. 步驟3:更新的處理的狀態(SUCCESSERROR

德duplicat離子: 當你實現這樣的庫,有重複的記錄的可能性(在某些角落情況下,同一個文件可以被用於處理拿起兩次)。我們實施了一個去重複邏輯來刪除重複的記錄。

+0

非常感謝您爲您詳細解答。它確實有幫助。 –

0

您可以使用日期時間重命名結果文檔,然後您的程序可以根據文檔的名稱處理文檔。

0

檢查上次運行時間戳的驅動程序是很好的方法,但爲了存儲上次運行時間戳,可以使用HDFS中的臨時文件。