2016-05-25 45 views
2

我是Spark和Hadoop生態系統的新手,已經愛上它了。 現在,我試圖將現有的Java應用程序移植到Spark。定義文件輸入的手動分割算法

該Java應用程序的結構方式如下:

  1. 讀取文件(S)逐個與一個自定義的解析器的類,它的輸入數據的一些繁重的計算一個BufferedReader。輸入文件的大小爲1到最大2.5 GB。
  2. 將數據存儲在內存中(在HashMap<String, TreeMap<DateTime, List<DataObjectInterface>>>中)
  3. 將內存數據存儲寫出爲JSON。這些JSON文件的大小較小。

我寫了一個Scala應用程序,它處理一個工作人員的文件,但這顯然不是我可以從Spark中獲得的最大性能好處。

現在到我的問題將此移植到Spark: 輸入文件是基於行的。我通常每行有一條消息。但是,一些消息依賴於前面的行來在解析器中形成實際有效的消息。例如,它可能發生,我得到以下順序數據輸入文件:

  1. {時間戳}#0x033#{data_bytes} \ n
  2. {時間戳}#量0x034 #{data_bytes} \ n
  3. {時間戳}#0x035#{data_bytes} \ n
  4. {時間戳}#0 x0FE#{data_bytes} \ n
  5. {時間戳}#0x036#{data_bytes} \ n

爲了形成實際的消息從 「組合物消息」 0x036的,解析器還需要來自消息0x033,0x034和0x035的行。其他消息也可以在這些所需消息之間進入。儘管可以通過閱讀單行來解析大部分消息。

現在終於我的問題: 如何讓Spark正確地分割我的文件爲我的目的?文件不能「隨機」分割;它們必須以確保我的所有消息可以被解析並且解析器不會等待他永遠不會得到的輸入的方式進行拆分。這意味着每個組合消息(依賴於前面的行的消息)需要在一個分組中。

我想有以下幾種方式來實現正確的輸出,但我會扔一些想法,我到這個職位還有:

  • 定義文件輸入手動分割算法?這將檢查分組的最後幾行不包含「大」消息[0x033,0x034,0x035]的開始。
  • 但是火花想要分割文件,但也要添加一個固定數量的行(可以說50,這將確保工作)從最後一次拆分到下一次拆分。Parser類將正確處理多個數據,不會引入任何問題。

第二種方式可能會更容易,但是我不知道如何在Spark中實現它。有人能指引我走向正確的方向嗎?

在此先感謝!

回答

1

我在http://blog.ae.be/ingesting-data-spark-using-custom-hadoop-fileinputformat/的博客上看到您的評論,並決定在此處提供我的意見。

首先,我不完全確定你要做什麼。幫助我在這裏:你的文件包含的行包含0x033,0x034,0x035和0x036,所以Spark會分別處理它們?而實際上這些線需要一起處理?

如果是這種情況,則不應將此解釋爲「腐敗分裂」。正如你可以在blogpost中閱讀的那樣,Spark將文件分割成可以單獨處理的記錄。默認情況下,它通過拆分換行符上的記錄來完成此操作。但在你的情況下,你的「記錄」實際上分散在多行上。所以是的,你可以使用自定義的fileinputformat。但我不確定這將是最簡單的解決方案。

您可以嘗試使用自定義的fileinputformat來解決此問題,該操作可以執行以下操作:而不是像默認的fileinputformat一樣逐行給出解析文件並跟蹤遇到的記錄(0x033,0x034等)。與此同時,您可能會過濾出像0x0FE這樣的記錄(不確定是否要在其他地方使用它們)。這樣做的結果將是Spark將所有這些物理記錄作爲一個邏輯記錄。另一方面,逐行讀取文件並使用功能鍵(例如[對象33,0x033],[對象33,0x034],...)映射記錄可能更容易。這樣,您可以使用您選擇的鍵組合這些行。

當然還有其他的選擇。無論你選擇哪個取決於你的用例。

+0

爲什麼我最初開始這個問題:我以爲Spark可能會將我的文件分割成錯誤的行。 我的意思是說,當最後一行是「合成消息」的開始時,Spark可以決定進行新的分割,因此在我的示例中包含[0x033]。它會打破我的Java分析代碼,因爲我認爲每個分裂將(可能)由不同的工作人員處理,因此不能訪問先前的行。 這就是爲什麼我想我需要使用自定義輸入格式「預處理」它以確保所有這些依賴於其他行消息都可以正確解析 – j9dy

+0

您的第二個建議可能是處理我的問題的另一種方法。感謝您提出這個問題。 我還有一些其他問題: - 我的每個文件包含多達50.000.000行。這會引入一個將行映射爲您所建議的格式的問題嗎? - 之後我如何處理每個鍵/值列表([object 33,0x033],[object 33,0x034],...)? - 當一個鍵(例如「對象33」)被命名時會發生什麼情況,總共有70%的值?這會分解爲進一步處理嗎? – j9dy

+0

您的第一條評論:當您按照這種方式進行編程時,Spark只能_decide_在其他地方進行新的分割。即創建一個文件輸入格式,用於定義文件如何分割。當你說分離的記錄可以由不同的工作人員處理時,你是正確的。我認爲fileinputformat可能是您的問題的有效解決方案! – Gurdt