2017-02-14 61 views
2

我目前正在使用連接到RDS的EMR集羣來收集2個表。Amazon EMR Pyspark:rdd.distinct.count()failling

創建的兩個RDD非常龐大,但我可以執行.take(x)操作。

我還可以執行更復雜的操作,如:

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda) 
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) 

但這樣做下面的操作來算,從RDS進口不同用戶的數量不工作:

unique_users = rdd.distinct.count() 

我曾嘗試很多配置,以查看它是否是之前的內存問題(以防萬一但它不能解決問題)...

這些是我現在得到的錯誤:

Traceback (most recent call last): 
File "/home/hadoop/AppEngine/src/server.py", line 56, in <module> 
run_server() 
File "/home/hadoop/AppEngine/src/server.py", line 53, in run_server 
AppServer().run() 
File "/home/hadoop/AppEngine/src/server.py", line 45, in run 
api = create_app(self.context, self.apps, self.devices) 
File "/home/hadoop/AppEngine/src/api.py", line 190, in create_app 
engine = AppEngine(spark_context, apps, devices) 
File "/home/hadoop/AppEngine/src/engine.py", line 56, in __init__ 
self.unique_users = self.ratings.distinct().count() 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1041, in count 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1032, in sum 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 906, in fold 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect 
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 

File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.5 in stage 0.0 (TID 5, ip-172-31-3-140.eu-west-1.compute.internal, executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 164253 ms 
Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at scala.Option.foreach(Option.scala:257) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.collect(RDD.scala:934) 
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) 
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
at py4j.Gateway.invoke(Gateway.java:280) 
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
at py4j.commands.CallCommand.execute(CallCommand.java:79) 
at py4j.GatewayConnection.run(GatewayConnection.java:214) 
at java.lang.Thread.run(Thread.java:745)` 
+0

我指的是在消息中的異常'ExecutorLostFailure(執行13退出由正在運行的任務之一引起的)原因:執行人心跳後164253 ms'超時 – mrsrinivas

回答

1

的解決方案是以下幾點:

我沒有足夠的內存來執行任務。 我將我在羣集中使用的核心實例的類型更改爲具有更多可用內存的實例(此處爲m4.4xlarge)。

然後我不得不精確的參數給力我的實例的內存分配火花sumbmit:

--driver-memory 2G 
--executor-memory 50G 

您還可以添加這些參數,以避免因心跳或的failling一項艱鉅的任務內存分配:

--conf spark.yarn.executor.memoryOverhead=XXX (large number such as 1024 or 4096) 
--conf spark.executor.heartbeatInterval=60s 
1

ExecutorLostFailure原因:執行人心跳後164253毫秒

這個錯誤意味着執行人沒有165秒後作出反應,被殺害(假設它是死下)

超時

如果您有一個任務長時間佔用執行程序,並且需要執行,您可以在​​命令行中嘗試以下設置,這會使心跳超時增加很長時間如此處所述:https://stackoverflow.com/a/37260231/5088142

一些方法如何調查此問題可以在這裏找到:https://stackoverflow.com/a/37272249/5088142


下面將試圖澄清一些問題,這在你的問題提出。

Spark Actions vs Transformations

星火使用延遲計算,即當您執行transformation它不執行它。星火只有當您執行action

在你給了複雜的操作,例如執行沒有動作(即沒有被執行/計算):

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda) 
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) 

回顧spark doc about transformation

你可以看到,所有的操作在例子中使用:map,groupByKeyjoin正在轉型。

因此,在執行這些命令後,實際上沒有做任何事情。

行動

創建的兩個RDD之間的差別是相當龐大,但我可以執行。取(x)的 操作等他們。

take(x)動作之間的差,並且count

take(x)動作結束後,返回的第x個元素。

count()行動結束後,才通過整個RDD

,你執行一些變換(如上例)的事實,是似乎運行是沒有意義的 - 因爲他們沒有執行。

正在運行take(x)行動無法給出任何指示,因爲它只會使用RDD的很小一部分。

結論

好像你的機器配置不支持您正在使用的數據的大小,或者你的代碼中創建造成執行人掛的時間很長一段艱鉅的任務(160秒)。

這實際上是對你RDD執行的第一個action是該問題的countaction