我有多個Java進程。每個進程將生成一個定期包含新數據的數據文件。Spark多個文件處理
我需要Spark來讀取這些文件並將數據轉儲到數據庫中。 我有以下相對於上述要求的問題 -
- 有什麼辦法星星之火可以知道它已處理1000條記錄,並有從1001每次挑選處理文件的時間開始?
- 或者我必須清除文件一旦Spark處理它1000個記錄,每次文件應該只包含未處理的記錄。在這種情況下,我怎樣才能在同一個文件中讀寫記錄。
由於我有連續的數據生成,我不能等待Spark完成它的工作,然後讓文件加載新數據。
我有多個Java進程。每個進程將生成一個定期包含新數據的數據文件。Spark多個文件處理
我需要Spark來讀取這些文件並將數據轉儲到數據庫中。 我有以下相對於上述要求的問題 -
由於我有連續的數據生成,我不能等待Spark完成它的工作,然後讓文件加載新數據。
在閱讀完您的問題後,我認爲您應該使用Spark Streaming並指定HDFS/S3,Kafka或flume作爲源代碼。
最好的辦法是改變你的Java進程發佈卡夫卡的記錄,並寫一個火花流代碼來讀取這些記錄。
你可以在這裏閱讀更多的細節: -
https://spark.apache.org/docs/latest/streaming-programming-guide.html
如果這是不可能的,有一兩件事你可以做的是檢查每個文件的最後修改日期和只讀,其最後的修改過的文件日期距離當前時間少於2分鐘。如果當前日期時間爲2017年6月15日上午8:00您需要確保您的代碼只讀取上次修改日期小於06/15/2017 07:58 AM的文件。通過這種方式,您可以確保只讀取那些未被Java進程處理的文件。即您目前不附加新數據。
您只會閱讀最近2分鐘內沒有任何活動的文件。 2分鐘的時間差是可以改變的,你可以根據你的代碼邏輯改變它。
謝謝,考慮到使用基於上次修改時間戳的讀取文件的邏輯將無法解決我在達到要處理的文件中的記錄的要求。
我發現的一種方法是使用Java 1.7引入的SeekableByteChannel,使用它可以跟蹤文件中您正在編寫新記錄並將其傳遞給Spark層的位置。 Spark然後可以從該位置讀取記錄並處理它。
我假設你只會讀取一次文件。如果這是真的,那麼你不需要從特定位置讀取文件。只有當進程停止寫入文件時,纔會讀取該文件。 – Max08
您是否正在閱讀HDFS的這些文件?你可以使用水槽和火花流? – Max08
不,我有一些限制,因爲我不能通過卡夫卡整合我的過程。使用火花流媒體是次要的事情..我不認爲我需要使用,因爲我可以使用Spark RDD API讀取文件。正如我所說,這個文件將不斷獲得新的數據,我的Spark程序必須不斷從同一個文件中讀取新的數據 – Techie