2016-12-28 80 views
0

我是Spark新手。我有一個簡單的pyspark腳本。它讀取一個json文件,將其平滑並將其作爲parquet壓縮文件寫入S3位置。PySpark輸出文件數

讀取和轉換步驟運行速度非常快,並且使用50個執行人(我在conf設置)。但寫入階段需要很長時間,只寫入一個大文件(480MB)。

怎樣的文件的數量保存的決定? 寫操作可以以某種方式加快嗎?

感謝, 拉姆。

回答

1

文件輸出的數目等於的RDD被保存分區的數量。在此示例中,RDD被重新分區以控制輸出文件的數量。

嘗試:

repartition(numPartitions) - 洗牌的數據中隨機RDD到 創造更多或更少的分區並在它們之間進行平衡。 這總是通過網絡混洗所有數據。

>>> dataRDD.repartition(2).saveAsTextFile("/user/cloudera/sqoop_import/orders_test") 

文件輸出的數目是一樣的RDD的partitionds的數目。

$ hadoop fs -ls /user/cloudera/sqoop_import/orders_test 
Found 3 items 
-rw-r--r-- 1 cloudera cloudera   0 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/_SUCCESS 
-rw-r--r-- 1 cloudera cloudera 1499519 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00000 
-rw-r--r-- 1 cloudera cloudera 1500425 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00001 

而且檢查:coalesce(numPartitions)

source-1 | source-2


更新:

textFile method也花費 控制文件的分區的數目的可選的第二個參數。默認情況下,星火 創建該文件的每個塊一個分區(塊被 默認HDFS 64MB),但你也可以通過傳遞一個較大的值,要求有較高的一些 分區。請注意,分區的塊數不能少於 。

...但這是可能分區的最小數目,所以它們不能保證。

,所以如果你想在讀取分區,你應該使用這個....

dataRDD=sc.textFile("/user/cloudera/sqoop_import/orders").repartition(2) 
+0

謝謝!重新分配何時發生?讀取期間是否可以對RDD進行分區?還是它必須是一個單獨的步驟? – Ram

+1

@Ram - 看到更新的答案 - 如果我的努力幫助您解決問題,請接受我的答案是公認的答案(點擊正確的符號旁邊的上/下箭頭上面,也單擊向上箭頭)歡呼:) –

+1

應該提到在減少分區數量的情況下,人們應該更傾向於'重新分配'而不是'重新分配',因爲它避免了全面洗牌。這是因爲Spark知道它可以將數據保留在所需的分區數量上,只將數據從額外的節點上移走。 – user4601931