2017-10-04 171 views
1

我對S3獸人數據的1.2GB,我試圖做同樣的下列內容:SnappyData:java.lang.OutOfMemoryError:GC開銷超過限制

1)高速緩存活潑的羣集上的數據[snappydata 0.9]

2)上的高速緩存的數據集

3)比較用火花的性能執行一個查詢GROUPBY 2.0.0

我使用的是64 GB/8芯機和用於斯納皮配置集羣如下:

012現在
$ cat locators 
localhost 

$cat leads 
localhost -heap-size=4096m -spark.executor.cores=1 

$cat servers 
localhost -heap-size=6144m 
localhost -heap-size=6144m 
localhost -heap-size=6144m 
localhost -heap-size=6144m 
localhost -heap-size=6144m 
localhost -heap-size=6144m 

,我已經寫了一個小python腳本,緩存從S3獸人數據並運行通過查詢一個簡單的基團,其是如下:

from pyspark.sql.snappy import SnappyContext 
from pyspark import SparkContext,SparkConf 
conf = SparkConf().setAppName('snappy_sample') 
sc = SparkContext(conf=conf) 
sqlContext = SnappyContext(sc) 

sqlContext.sql("CREATE EXTERNAL TABLE if not exists my_schema.my_table using orc options(path 's3a://access_key:[email protected]_name/path')") 
sqlContext.cacheTable("my_schema.my_table") 

out = sqlContext.sql("select * from my_schema.my_table where (WeekId = '1') order by sum_viewcount desc limit 25") 
out.collect() 

上述腳本用執行下面的命令:

spark-submit --master local[*] snappy_sample.py 

,我得到以下錯誤:

17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_5 in memory! (computed 21.2 MB so far) 
17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_0 in memory! (computed 21.2 MB so far) 
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_5 to disk instead. 
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_0 to disk instead. 
17/10/04 02:50:47 WARN storage.BlockManager: Putting block rdd_2_2 failed due to an exception 
17/10/04 02:50:47 WARN storage.BlockManager: Block rdd_2_2 could not be removed as it was not found on disk or in memory 
17/10/04 02:50:47 ERROR executor.Executor: Exception in task 2.0 in stage 0.0 (TID 2) 
java.lang.OutOfMemoryError: GC overhead limit exceeded 


at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) 
    at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96) 
    at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98) 
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
17/10/04 02:50:47 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-2,5,main] 
java.lang.OutOfMemoryError: GC overhead limit exceeded 
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) 
    at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96) 
    at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98) 
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
17/10/04 02:50:48 INFO snappystore: VM is exiting - shutting down distributed system 

除了上面的錯誤,我該如何檢查數據是否被緩存在快速集羣中?

回答

2

1)首先,它看起來不像是用python腳本連接到SnappyData集羣,而是以本地模式運行它。在這種情況下,python腳本啓動的JVM會與預期的OOM一起失敗。當使用Python連接到SnappyData集羣中的「smart connector」模式:

spark-submit --master local[*] --conf snappydata.connection=locator:1527 snappy_sample.py 

主機:端口以上是定位器主機和端口上節儉服務器正在運行(1527默認情況下)。其次,你有的例子只是使用Spark來緩存。如果你想使用SnappyData,負載成列的表:

from pyspark.sql.snappy import SnappySession 
from pyspark import SparkContext,SparkConf 
conf = SparkConf().setAppName('snappy_sample') 
sc = SparkContext(conf=conf) 
session = SnappySession(sc) 

session.sql("CREATE EXTERNAL TABLE if not exists my_table using orc options(path 's3a://access_key:[email protected]_name/path')") 
session.table("my_table").write.format("column").saveAsTable("my_column_table") 

out = session.sql("select * from my_column_table where (WeekId = '1') order by sum_viewcount desc limit 25") 
out.collect() 

還要注意使用「SnappySession」,而不是背景,因爲星火2.0.x的它被廢棄與Spark緩存進行比較時,可以在單獨的腳本中使用「cacheTable」並針對上游的Spark運行。請注意,「cacheTable」將緩慢執行緩存,這意味着第一個查詢將執行實際的緩存,因此第一次查詢運行將非常緩慢,但後續的應該更快。

3)更新到1.0版本,有很多改進,而不是使用0.9。在啓動羣集之前,您還需要將hadoop-aws-2.7.3aws-java-sdk-1.7.4添加到conf/leads和conf/servers中的「-classpath」(或放入產品的jars目錄中)。

相關問題