2016-08-15 31 views
3

(這是Spark 2.0在小型三臺機器上運行的Amazon EMR集羣)星火。 〜1億行。大小超過Integer.MAX_VALUE?

我有一個PySpark作業,它將一些大文本文件加載到Spark RDD中,count()成功返回158,598,155。

然後,作業將每行解析爲一個pyspark.sql.Row實例,構建一個DataFrame並執行另一個計數。 DataFrame上的第二個count()會在Spark內部代碼Size exceeds Integer.MAX_VALUE中導致異常。這適用於較小的數據量。有人可以解釋爲什麼/如何發生?

org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 1.0 failed 4 times, most recent failure: Lost task 22.3 in stage 1.0 (TID 77, ip-172-31-97-24.us-west-2.compute.internal): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) 
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) 
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) 
    at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439) 
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:604) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

PySpark代碼:

raw_rdd = spark_context.textFile(full_source_path) 

# DEBUG: This call to count() is expensive 
# This count succeeds and returns 158,598,155 
logger.info("raw_rdd count = %d", raw_rdd.count()) 
logger.info("completed getting raw_rdd count!!!!!!!") 

row_rdd = raw_rdd.map(row_parse_function).filter(bool) 
data_frame = spark_sql_context.createDataFrame(row_rdd, MySchemaStructType) 

data_frame.cache() 
# This will trigger the Spark internal error 
logger.info("row count = %d", data_frame.count()) 
+0

第二個'counts()'的預期結果是什麼? – gsamaras

+0

請分享發生錯誤的代碼片段。 – javadba

+0

@gsamaras,基本上與第一計數相同。 – clay

回答

0

錯誤從data_frame.count()本身自帶不是,而是因爲通過row_parse_function解析行產生一些整數這不符合規定的整數類型MySchemaStructType

嘗試將模式中的整數類型增加到pyspark.sql.types.LongType(),或者讓火花通過省略模式來推斷類型(但這會降低評估速度)。

+0

現在,'row_parse_function'明確檢查出界值。 'FileChannelImpl.map'發生異常,這對於超出範圍分析錯誤是沒有意義的。 – clay

+0

@clay你可以發佈'row_parse_function'和'MySchemaStructType'嗎? – antonislav