2016-07-11 42 views
2

我試圖從數據庫中查詢數據,對它做一些轉換並在hdfs上以parquet格式保存新數據。當寫出火花數據幀到實木複合格式時出現內存不足錯誤

由於數據庫查詢返回大量行,因此我正在批量獲取數據並在每個傳入批處理中運行上述過程。

更新2:批量處理邏輯是:

import scala.collection.JavaConverters._ 

import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType, StructField, StringType} 

class Batch(rows: List[String], 
      sqlContext: SQLContext) { 

     // The actual schema has around 60 fields 
     val schema = Array("name", "age", "address").map(field => 
         StructField(field, StringType, true) 
        ) 

     val transformedRows = rows.map(rows => { 

       // transformation logic (returns Array[Array[String]] type) 

      }).map(row => Row.fromSeq(row.toSeq)) 

     val dataframe = sqlContext.createDataFrame(transformedRows.asJava, schema) 

} 

val sparkConf = new sparkConf().setAppName("Spark App") 
val sparkContext = new SparkContext(sparkConf) 
val sqlContext = new SQLContext(sparkContext) 

// Code to query database 
// queryResponse is essentially an iterator that fetches the next batch on calling queryResponse.next 

var batch_num = 0 

while (queryResponse.hasNext) { 
    val batch = queryResponse.next 

    val batchToSave = new Batch(
          batch.toList.map(_.getDocument.toString), 
          sqlContext) 

    batchToSave.dataframe.write.parquet(batch_num + "_Parquet") 

    batch_num += 1 

} 

在1.6.1我的火花版本和火花提交是:

spark-submit target/scala-2.10/Spark\ Application-assembly-1.0.jar 

的問題是,經過一定的批次數,我得到java.lang.OutOfMemoryError錯誤。

整個堆棧跟蹤是:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOfRange(Arrays.java:2694) 
    at java.lang.String.<init>(String.java:203) 
    at java.lang.StringBuilder.toString(StringBuilder.java:405) 
    at scala.StringContext.standardInterpolator(StringContext.scala:125) 
    at scala.StringContext.s(StringContext.scala:90) 
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:70) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
    at app.Application$.main(App.scala:156) 
    at app.Application.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

我嘗試將數據合併到一個單一的分區,但沒有任何區別。

dataframe.coalesce(1).write.parquet(batch_num + "_Parquet") 

任何幫助,將不勝感激。

更新1

如果不這樣做的一個RDD變換COALESCE仍然給出了一個錯誤,但堆棧跟蹤如下。似乎是Parquet的問題。

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1855) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1945) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
    at app.Application$.main(App.scala:156) 
    at app.Application.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.OutOfMemoryError: Java heap space 
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90) 
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86) 
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93) 
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229) 
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131) 
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178) 
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203) 
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83) 
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68) 
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56) 
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183) 
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99) 
    at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100) 
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303) 
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetRelation.scala:94) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:286) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
+1

否否否否否否不合並 - 這與您想要做的相反。 – GameOfThrows

+0

[如何處理「java.lang.OutOfMemoryError:Java堆空間」錯誤(64MB堆大小)](http://stackoverflow.com/questions/37335/how-to-deal-with-java- lang-outofmemoryerror -java-heap-space-error-64mb-heap) –

+0

如果你的數據(跨所有分區,因爲你正在合併數據)大於可用內存,你將擁有OOM。通常情況下,合併到單個分區是一種不好的做法,因爲它佔用了您的大部分可擴展性。 –

回答

2

今天也有同樣的問題。原來我的執行計劃相當複雜,並且toString生成了150 MB的信息,與Scala的字符串插值相結合導致驅動程序內存不足。

您可以嘗試增加驅動程序的內存(我必須將其從8 GB加倍到16 GB)。

相關問題