2016-12-01 49 views
1

我正在尋找能的工具文件:DIR新文件尋找一種方式來連續處理寫入HDFS

  1. 監控HDFS和處理它們,因爲它們出現。
  2. 它還應該處理作業/應用程序開始工作前目錄中的文件。
  3. 它應該有檢查點在重新啓動的情況下從其離開的地方繼續。

我看着apache spark:它可以讀取新添加的文件,並且可以處理重新啓動以便從其剩下的位置繼續。我無法找到一種方法來處理同一作業範圍內的舊文件(所以只有1和3)。

我看着apache flink:它處理舊文件和新文件。但是,一旦作業重新啓動,它將再次開始處理它們(1和2)。

這是一個應該很常見的用例。我在spark/flink中錯過了什麼使它成爲可能嗎?有沒有其他工具可以在這裏使用?

+1

Didi你認爲Apache NiFi?啊,也許你更喜歡從頭開始手工編碼所有東西...... –

回答

5

使用Flink流媒體,您可以完全按照您的建議處理目錄中的文件,並且在重新啓動它時將從其停止的位置開始處理。它被稱爲連續文件處理。

你所要做的唯一事情是1)使檢查點對您的工作和2)啓動程序:

Time period = Time.minutes(10) 
    env.readFile(inputFormat, "hdfs:// … /logs", 
       PROCESS_CONTINUOUSLY, 
       period.toMilliseconds, 
       FilePathFilter.createDefaultFilter()) 

的功能是相當新的,有在開發郵件列表中的一個積極的討論關於如何進一步改進其功能。

希望這會有所幫助!

+0

順便說一下,您必須啓用檢查點以便從您離開的位置開始。如果沒有,那麼一切都將被重新處理。 –

+0

這不工作..檢查點信息僅在某些文件處於處理過程中時存儲。完成後 - 沒有關於已經處理的信息。檢查點目錄是空的,因此重新啓動作業後,所有事情都會再次處理。 – Art

1

我建議你修改一下文件攝入併合並Kafka,這樣每次你在HDFS中放入一個新文件時,都會在卡夫卡隊列中放入一條消息。然後使用Spark streaming從隊列中讀取文件名,然後從hdfs和進程中讀取文件。

檢查指向是一個真正的痛苦,也不能保證你想要什麼。帶火花的卡夫卡將能夠確保一次語義。

Flume有一個SpoolDirSource,你也可以看看它。

相關問題