2016-10-05 35 views
2

我有20k個壓縮文件,大小約爲2MB,用於處理spark。我最初的想法是使用wholeTextFiles(),以便我得到文件名 - >內容元組。這很有用,因爲我需要維護這種配對(因爲處理是在每個文件基礎上完成的,每個文件代表一分鐘收集的數據)。但是,每當我需要映射/過濾/等方面的數據,並保持此文件名 - >相關聯,代碼變得醜陋即使用wholeTextFiles進行火花數據處理

Data.map(lambda (x,y) : (x, y.changeSomehow)) 

數據本身,所以每次的內容(也許並不是有效的?)文件,因爲它包含10k行數據,所以將其作爲單獨的RDD讀取會很好;然而,不能有rdd rdd(據我所知)。

有什麼辦法可以緩解這個過程嗎?任何解決方法,基本上允許我使用每個文件的內容作爲一個rdd,因此允許我做rdd.map(lambda x: change(x))沒有醜陋的跟蹤文件名(以及使用列表解析而不是轉換)?

當然,目標是保持分佈式方法,而不是以任何方式抑制它。

處理的最後一步是通過縮小來收集所有內容。

更多的背景:試圖確定在每分鐘的基礎上(近)船碰撞,然後繪製它們的路徑

+0

如果你需要速度,我建議階。 Python的速度大約慢了10倍,一方面是因爲python本來就比較慢,另一方面是因爲它必須將數據從jvm發佈到python – Reactormonk

+0

@Reactormonk我想這樣做。但是,我必須使用python腳本來解碼文件中的每一行。更確切地說,我正在談論AIS消息,因爲我只在Python中發現代碼(在給定時間的情況下,我自己寫的代碼太複雜了)。如果您知道解決方法,請告知 – Dimebag

+1

https://github.com/dma-ais/AisLib? – Reactormonk

回答

1

如果你有正常map功能(o1-> O2),你可以使用mapValues功能。你也有flatMap(o1 - > Collection())函數:flatMapValues。

它將保持密鑰(在你的情況下 - 文件名)並只更改值。

例如:

rdd = sc.wholeTextFiles (...) 
# RDD of i.e. one pair, /test/file.txt -> Apache Spark 
rddMapped = rdd.mapValues (lambda x: veryImportantDataOf(x)) 
# result: one pair: /test/file.txt -> Spark 

使用reduceByKey可以減少結果

+0

這已經是一個改進,謝謝!因此,我現在的一些行看起來像'data.mapValues(lambda x:[list comprehension])',其中list包含文件中的每一行。如果在效率和可讀性方面x是它自己的rdd而不是列表,那將是非常棒的。你是否同意,也許有一個想法如何做? – Dimebag

+0

密鑰對RDD的值不能是RDD。它可以是任何可序列化的數據結構,即可以創建案例類「DocumentContent」。 RDD就像數據轉換的邏輯計劃,它不僅僅是一個集合。驅動程序必須構建一個調用鏈來繼續進行RDD操作,嵌入式RDD會導致很多問題,這就是爲什麼RDD的RDD不被允許 –