2017-05-17 75 views
2

我試圖挽救一個Spark數據幀的蜂巢表(木地板),在pySpark .saveAsTable(),但要像下面內存問題運行在:內存分配問題蜂巢表

org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1: 
New Memory allocation 1034931 bytes is smaller than the minimum allocation size of 1048576 bytes. 

第一個號碼(1034931)通常在不同的運行中保持不變。我認識到第二個數字(1048576)是1024^2,但我不知道這裏的含義。

我一直在使用完全相同的技術用於其他一些項目(具有更大的DataFrames),並且它沒有問題。在這裏,我基本上是複製粘貼了進程和配置的結構,但卻遇到了內存問題!它一定是我失蹤的小事。

的Spark數據框(我們稱之爲sdf)具有結構(〜10列〜300K行,但可能是更多,如果該程序運行正常):

+----------+----------+----------+---------------+---------------+ 
| col_a_str| col_b_num| col_c_num|partition_d_str|partition_e_str| 
+----------+----------+----------+---------------+---------------+ 
|val_a1_str|val_b1_num|val_c1_num|  val_d1_str|  val_e1_str| 
|val_a2_str|val_b2_num|val_c2_num|  val_d2_str|  val_e2_str| 
|  ...|  ...|  ...|   ...|   ...| 
+----------+----------+----------+---------------+---------------+ 

Hive的表是這樣創建的:

sqlContext.sql(""" 
        CREATE TABLE IF NOT EXISTS my_hive_table (
         col_a_str string, 
         col_b_num double, 
         col_c_num double 
        ) 
        PARTITIONED BY (partition_d_str string, 
            partition_e_str string) 
        STORED AS PARQUETFILE 
       """) 

在插入數據到該表中的嘗試是使用下面的命令:

sdf.write \ 
    .mode('append') \ 
    .partitionBy('partition_d_str', 'partition_e_str') \ 
    .saveAsTable('my_hive_table') 

星火/蜂巢結構是這樣的:

spark_conf = pyspark.SparkConf() 
spark_conf.setAppName('my_project') 

spark_conf.set('spark.executor.memory', '16g') 
spark_conf.set('spark.python.worker.memory', '8g') 
spark_conf.set('spark.yarn.executor.memoryOverhead', '15000') 
spark_conf.set('spark.dynamicAllocation.maxExecutors', '64') 
spark_conf.set('spark.executor.cores', '4') 

sc = pyspark.SparkContext(conf=spark_conf) 

sqlContext = pyspark.sql.HiveContext(sc) 
sqlContext.setConf('hive.exec.dynamic.partition', 'true') 
sqlContext.setConf('hive.exec.max.dynamic.partitions', '5000') 
sqlContext.setConf('hive.exec.dynamic.partition.mode', 'nonstrict') 
sqlContext.setConf('hive.exec.compress.output', 'true') 

我曾試圖改變.partitionBy('partition_d_str', 'partition_e_str').partitionBy(['partition_d_str', 'partition_e_str']),增加內存,分割數據幀,以更小的塊,重新創建表和數據幀,但似乎沒有任何工作。我無法在網上找到任何解決方案。什麼會導致內存錯誤(我不完全理解它來自哪裏),以及如何更改我的代碼以寫入Hive表?謝謝。

+1

實木複合地板的最小頁面大小(即最小讀/寫單位)由屬性parquet.page.size定義,默認爲1048576.其試圖寫入的數據可能低於此閾值。那就是爲什麼投擲錯誤可能是?這只是我的猜測... [檢查了這一點](https://github.com/Parquet/parquet-mr/blob/fa8957d7939b59e8d391fa17000b34e865de015d/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java# L64) – Pushkr

+0

感謝您的鏈接。通過玩'parquet.page.size'和'parquet.block.size'配置,以及通過乘以我的數據的大小,但沒有運氣嘗試了你的建議。相同的錯誤:( – vk1011

回答

2

事實證明,我正在用可空字段進行分區,拋出.saveAsTable()

from pyspark.sql.types import * 

# Define schema 
my_schema = StructType(
        [StructField('col_a_str', StringType(), False), 
        StructField('col_b_num', DoubleType(), True), 
        StructField('col_c_num', DoubleType(), True), 
        StructField('partition_d_str', StringType(), False), 
        StructField('partition_e_str', StringType(), True)]) 

# Convert RDD to Spark DataFrame 
sdf = sqlContext.createDataFrame(my_rdd, schema=my_schema) 

由於partition_e_str被宣佈爲nullable=True(用於StructField第三個參數),它寫的時候有問題:當我轉換RDD到火花數據幀,我提供的模式是這樣產生的Hive表,因爲它被用作分區字段之一。我把它改爲:

# Define schema 
my_schema = StructType(
        [StructField('col_a_str', StringType(), False), 
        StructField('col_b_num', DoubleType(), True), 
        StructField('col_c_num', DoubleType(), True), 
        StructField('partition_d_str', StringType(), False), 
        StructField('partition_e_str', StringType(), False)]) 

,一切都很好再次!

課程:確保您的分區字段不爲空!