2016-11-11 62 views
0

我有一堆壓縮的文本文件的其中含有JSON對象的每一行。簡化我的工作流程如下:的RDD gziped文件爲「未壓縮的」數據幀

string_json = sc.textFile('/folder/with/gzip/textfiles/') 
json_objects = string_json.map(make_a_json) 
DataRDD = json_objects.map(extract_data_from_json) 
DataDF = sqlContext.createDataFrame(DataRDD,schema).collect() 
'''followed by some transformations to the dataframe''' 

現在,代碼工作正常。一旦數字文件不能在執行者之間平均分配,問題就會出現。

也就是說,據我瞭解,因爲火花沒有解壓的文件,然後分配行的執行者,而是每一個劊子手得到一個文件的工作。

e.g如果我有5個文件和4個執行人,第一4個文件是並行處理的,然後將第五文件。

由於第五未被並行地處理與其它4和4個之間執行人不能分割,它需要的時間相同數量的第一4在一起。

這發生在程序的每個階段。

有沒有辦法這種條塊RDD要麼變身爲RDD或數據幀,是不是?我使用的Python 3.5和2.0.1火花

+0

sc.textFile()拿起文件夾內的所有文件?你確定 ?我認爲我們需要wholeTextFiles()! –

+0

nope sc.textFile獲取文件夾中的所有內容。據我瞭解這是一個Hadoop函數。 – Thagor

回答

2

星火操作

分爲任務,或者可以並行完成的工作單位。有幾件事情需要了解sc.textFile

  1. 如果要加載多個文件,你會得到每個文件1個任務,以最低的。
  2. 如果你正在加載gzip文件,你最多隻能得到每個文件1個任務。

基於這兩個前提,您的用例將看到每個文件的一個任務。關於任務/內核比率如何影響掛鐘時間,您絕對正確:在4個內核上運行5個任務的時間大致與4個內核上的8個任務時間相同(儘管不完全正確,因爲存在散列函數並且第一個內核要完成將承擔第五項任務)。

一條經驗法則是,你應該每個核心有大約2-5個任務在你的星火集羣看到不錯的表現。但是如果你只有5個gzip文本文件,你不會看到這個。你可以試着重新分區RDD(其採用了比較昂貴的整理操作),如果你做了很多下游:

repartitioned_string_json = string_json.repartition(100, shuffle=True) 
+0

不是在這裏選擇,但你有我的投票。後續問題 - 你說每個文件1個任務,因爲gzip - 如果它沒有壓縮,它是如何工作的? – itaysk

+0

thx我會以此爲基準。主數據集大於5個文件,所以我會看看它是否對性能有積極影響。 – Thagor

+1

@itaysk:每個文件的1個任務是一個.gz文件的約束,因爲它們不可分割,這意味着如果給定開始和結束文件偏移量,則無法讀取該塊中的第一行開始。如果文件是可拆分的(例如標準文本),那麼每個輸入拆分將得到1個任務。這些輸入分割的大小由一系列設置決定,但通常爲32M-128M,具體取決於您是從hadoop還是從NFS文件系統讀取數據。 – Tim