2016-03-02 119 views
1

我有一個在yarn上運行的spark作業,它可以處理大約150GB的數據集並執行多個shuffle操作並最終將數據存儲到hbase中。它在saveAsHadoopDataset保持失敗在報告高GC活動後,基本上多個執行程序在此階段失敗。但是,執行程序日誌,驅動程序日誌或節點管理器日誌均不會顯示任何OutOfMemory錯誤或GC超額開銷超出錯誤或超出錯誤的內存限制。我也沒有看到Executor失敗的其他原因以及spark ui。由於某些未知原因導致Spark作業在saveAsHadoopDataset階段由於丟失執行程序而失敗

val hConf = HBaseConfiguration.create 
hConf.setInt("hbase.client.scanner.caching", 10000) 
hConf.setBoolean("hbase.cluster.distributed", true) 
new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig) 

驅動程序日誌:

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost) 
Driver stacktrace: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065) 

執行人日誌:

16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0 (TID 15318). 2099 bytes result sent to driver 
16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 15333 
16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0 (TID 15333) 
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125 non-empty blocks out of 3007 blocks 
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14 remote fetches in 10 ms 
16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to maprnode5 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 
16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from maprnode5 is closed 
16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches 
java.io.IOException: Connection from maprnode5 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:744) 
16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 6 outstanding blocks after 5000 ms 
16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive connection to maprnode5, creating a new one. 
16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in connection from maprnode5 
java.io.IOException: Connection reset by peer 
     at sun.nio.ch.FileDispatcherImpl.read0(Native Method) 
     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
     at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) 
     at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) 
     at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) 
     at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) 
     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:744) 
16/02/24 11:12:16 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from maprnode5 is closed 
16/02/24 11:12:16 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches 
+0

您只包含一個執行程序日誌,它似乎不是來自失敗執行程序的日誌。執行程序日誌說「從maprnode5連接關閉」,那麼執行程序在maprnode5上的日誌看起來像什麼? –

+0

「執行程序日誌」是從運行'yarn logs -applicationId appid'獲得的,所以我相信這是執行程序日誌。這是我可以在日誌聚合發生的文件系統上看到的相同日誌。讓我知道我是否應該在其他地方尋找它。 – nir

+0

Hi Nir, 我使用相同的api在Hbase中插入數據 –

回答

1

因此,原來雖然火花UI說,它未能在saveAsHadoopDataSet它實際上是在第一次失敗saveAsHadoopDataSet是最後一步的階段的一步。詳細說明,spark基於窄變換的序列或組合寬變換和窄變換的序列來定義階段邊界。在我的具體情況下,序列是groupByKey(wide dep) -> mapValues(narrow dep) -> map(narrow dep),最後的地圖實際上是saveAsHadoopDataSet。執行者報告在實際混洗階段groupByKey高GC活動和內存使用情況。我將我的應用程序邏輯更改爲使用reduceByKey而不是groupByKey。現在它超級慢,但至少沒有失敗。

相關問題