1
我有一個很大的(大約85GB壓縮的)s3文件,我嘗試在AWS EMR上用Spark處理(現在有一個m4.xlarge主實例和兩個m4)。 10倍擴展核心實例,每個實例具有100 GB EBS卷)。我知道gzip是一種不可拆分的文件格式,並且I'veseenitsuggested應該重新分區壓縮文件,因爲Spark最初會給RDD提供一個分區。但是,這樣做在Spark中處理一個大的gzip文件
scala> val raw = spark.read.format("com.databricks.spark.csv").
| options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
| load("s3://path/to/file.gz").
| repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()
,並採取一看星火應用程序UI後,我仍然看到只有一個活動的執行者(其他14個都是死的)一個任務,任務無法完成(或至少我」沒有足夠的時間等待它)。
- 這是怎麼回事?有人能幫我理解Spark在這個例子中的工作原理嗎?
- 我應該使用不同的羣集配置嗎?
- 不幸的是,我無法控制壓縮模式,但是有沒有另一種處理這種文件的方法?
我的印象是Spark在重新分區之前首先解壓文件。這不是這種情況嗎?那麼,我提到的四條鏈接是什麼? – user4601931
是的,Spark在首先整個解壓文件(80G在一個核心上)之前,它可以混洗它來增加並行性。 – Tim
好的,謝謝。你認爲我的集羣甚至能夠處理這個任務嗎?如果是這樣,如果我想解壓整個文件,重新分區,然後做進一步處理,你認爲設置'spark.dynamicAllocation.enabled = true'將確保我得到一個執行器(儘可能多的內存)到在執行處理之後,執行解壓縮,然後執行更多的執行器(內存更少但內核更多)? – user4601931