2017-10-11 69 views
0

我正在運行一個涉及多次火花並行的程序。該程序運行良好的前幾個迭代,但由於內存問題導致崩潰。我在Python 2.7中使用Spark 2.2.0,並且我正在使用30g內存在AWS EC2上運行我的測試。Pyspark內存問題

下面是我的星火設置:

conf = pyspark.SparkConf() 
conf.set("spark.executor.memory", '4g') 
conf.set('spark.executor.cores', '16') 
conf.set('spark.cores.max', '16') 
conf.set("spark.driver.memory",'4g') 
conf.setMaster("local[*]") 

,這裏是我的錯誤日誌:

Traceback (most recent call last): 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1982, 
    in wsgi_app 
    response = self.full_dispatch_request() 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1614, 
    in full_dispatch_request 
    rv = self.handle_user_exception(e) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1517, 
    in handle_user_exception 
    reraise(exc_type, exc_value, tb) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1612, 
    in full_dispatch_request 
    rv = self.dispatch_request() 
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1598, 
    in dispatch_request 
    return self.view_functions[rule.endpoint](**req.view_args) 
    File 
    "C:/Users/Administrator/Desktop/Flex_Api_Post/ 
    flex_api_post_func_spark_setup.py", line 152, in travel_time_est 
    count = ssc.parallelize(input_json).map(lambda j: 
    flex_func(j)).collect() 
    File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 
    809, 
    in collect 
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\java_gateway.py", 
    line 
    1160, in __call__ 
    answer, self.gateway_client, self.target_id, self.name) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\protocol.py", line 
    320, in get_return_value 
    format(target_id, ".", name), value) 
    Py4JJavaError: An error occurred while calling 
    z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
    : org.apache.spark.SparkException: Job aborted due to 
    stage failure: Task 7 
    in stage 13.0 failed 1 times, most recent failure: 
    Lost task 7.0 in stage 
    13.0 (TID 215, localhost, executor driver): 
    org.apache.spark.api.python.PythonException: 
    Traceback (most recent call 
    last): 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 166, in main 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 57, in 
    read_command 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", 
    line 454, in loads 
    return pickle.loads(obj) 
    MemoryError 

    at 
    org.apache.spark.api.python.PythonRunner$$anon$1. 
    read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init> 
    (PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

    Driver stacktrace: 
    at 

    org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler 
    $DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) 
    at 
    org.apache.spark.scheduler.DAGScheduler 
    $$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) 
    at org.apache.spark.scheduler. 
    DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) 
    at scala.collection. 
    mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler. 
    DAGScheduler.abortStage(DAGScheduler.scala:1486) 
    at org.apache.spark.scheduler. 
    DAGScheduler$$anonfun$handleTaskSetFailed$1.apply 
    (DAGScheduler.scala:814) 
    at org.apache.spark.scheduler. 
    DAGScheduler$$anonfun$handleTaskSetFailed$1.apply 
    (DAGScheduler.scala:814) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler. 
    DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) 
    at org.apache.spark.scheduler. 
    DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) 
    at org.apache.spark.scheduler. 
    DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) 
    at org.apache.spark.scheduler. 
    DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler. 
    runJob(DAGScheduler.scala:630) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) 
    at org.apache.spark.rdd.RDDOperationScope$. 
    withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$. 
    withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935) 
    at org.apache.spark.api.python.PythonRDD$. 
    collectAndServe(PythonRDD.scala:458) 
    at org.apache.spark.api.python.PythonRDD. 
    collectAndServe(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Unknown Source) 
    Caused by: org.apache.spark.api.python.PythonException: 
    Traceback (most recent call last): 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", 
    line 166, in main 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", 
    line 57, in read_command 
    File "C:\opt\spark\spark-2.2.0-bin- 
    hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", 
    line 454, in loads 
    return pickle.loads(obj) 
    MemoryError 

    at org.apache.spark.api.python. 
    PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init> 
    (PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    1 more 

回答

1

讓我們來解釋一下PySpark是如何工作的。

對每個worker使用16個內核的pyspark,您要求Spark爲每個JVM worker並行啓動16個Python實例。您可以在下面的圖片中看到:

enter image description here

所以,根據我可以點擊以下鏈接,您的配置,您要請求與每個4Gb的工人,每個人會用16個內核運行。因此,這將創建一個具有1個JVM的結構,它將創建16個管道,以及16個並行運行的python實例。你正面臨的這個錯誤是關於沒有足夠的內存讓Python運行。

也許你需要減少每個worker的內核數量,它可以處理進程,或者你可以添加更多的內存。

欲瞭解更多詳情檢查here