我是Spark和Hadoop生態系統的新手,已經愛上它了。 現在,我試圖將現有的Java應用程序移植到Spark。定義文件輸入的手動分割算法
該Java應用程序的結構方式如下:
- 讀取文件(S)逐個與一個自定義的解析器的類,它的輸入數據的一些繁重的計算一個
BufferedReader
。輸入文件的大小爲1到最大2.5 GB。 - 將數據存儲在內存中(在
HashMap<String, TreeMap<DateTime, List<DataObjectInterface>>>
中) - 將內存數據存儲寫出爲JSON。這些JSON文件的大小較小。
我寫了一個Scala應用程序,它處理一個工作人員的文件,但這顯然不是我可以從Spark中獲得的最大性能好處。
現在到我的問題將此移植到Spark: 輸入文件是基於行的。我通常每行有一條消息。但是,一些消息依賴於前面的行來在解析器中形成實際有效的消息。例如,它可能發生,我得到以下順序數據輸入文件:
- {時間戳}#0x033#{data_bytes} \ n
- {時間戳}#量0x034 #{data_bytes} \ n
- {時間戳}#0x035#{data_bytes} \ n
- {時間戳}#0 x0FE#{data_bytes} \ n
- {時間戳}#0x036#{data_bytes} \ n
爲了形成實際的消息從 「組合物消息」 0x036的,解析器還需要來自消息0x033,0x034和0x035的行。其他消息也可以在這些所需消息之間進入。儘管可以通過閱讀單行來解析大部分消息。
現在終於我的問題: 如何讓Spark正確地分割我的文件爲我的目的?文件不能「隨機」分割;它們必須以確保我的所有消息可以被解析並且解析器不會等待他永遠不會得到的輸入的方式進行拆分。這意味着每個組合消息(依賴於前面的行的消息)需要在一個分組中。
我想有以下幾種方式來實現正確的輸出,但我會扔一些想法,我到這個職位還有:
- 定義文件輸入手動分割算法?這將檢查分組的最後幾行不包含「大」消息[0x033,0x034,0x035]的開始。
- 但是火花想要分割文件,但也要添加一個固定數量的行(可以說50,這將確保工作)從最後一次拆分到下一次拆分。Parser類將正確處理多個數據,不會引入任何問題。
第二種方式可能會更容易,但是我不知道如何在Spark中實現它。有人能指引我走向正確的方向嗎?
在此先感謝!
爲什麼我最初開始這個問題:我以爲Spark可能會將我的文件分割成錯誤的行。 我的意思是說,當最後一行是「合成消息」的開始時,Spark可以決定進行新的分割,因此在我的示例中包含[0x033]。它會打破我的Java分析代碼,因爲我認爲每個分裂將(可能)由不同的工作人員處理,因此不能訪問先前的行。 這就是爲什麼我想我需要使用自定義輸入格式「預處理」它以確保所有這些依賴於其他行消息都可以正確解析 – j9dy
您的第二個建議可能是處理我的問題的另一種方法。感謝您提出這個問題。 我還有一些其他問題: - 我的每個文件包含多達50.000.000行。這會引入一個將行映射爲您所建議的格式的問題嗎? - 之後我如何處理每個鍵/值列表([object 33,0x033],[object 33,0x034],...)? - 當一個鍵(例如「對象33」)被命名時會發生什麼情況,總共有70%的值?這會分解爲進一步處理嗎? – j9dy
您的第一條評論:當您按照這種方式進行編程時,Spark只能_decide_在其他地方進行新的分割。即創建一個文件輸入格式,用於定義文件如何分割。當你說分離的記錄可以由不同的工作人員處理時,你是正確的。我認爲fileinputformat可能是您的問題的有效解決方案! – Gurdt