我試圖診斷我的火花網格上的一些奇怪的連接問題:我看到一個瘋狂的斷開連接數。Pyspark放棄所有連接
我跑的東西,看起來像這樣的分佈式pyspark集羣上
spark_context.parallelize(tasks)) \
.map(lambda kwargs: my_mapped_fn(**kwargs) \
.reduceByKey(my_reduce_by_key) \
.map(lambda (x,y): (x, my_final_map(x,y))) \
.reduce(my_final_reduce)
我敢肯定的my_final_map部分中失敗,因此我對封閉運輸懷疑,這麼多,我的工作失敗。
以下是錯誤我得到:
java.io.IOException異常:無法連接到10.12.9.117:38103 在org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java: 228) at org.apache.spark.network.netty.NettyBlockTransferService $$ anon $ 1.createAndStart(NettyBlockTransferService.scala:97)org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) 在org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) 在org.apache.spark.network.shuffle.RetryingBlockFetcher.start(重試BlockFetcher.java:120) 在org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:106) 在org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92) 在org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:579) at org.apache.spark.scheduler.TaskResultGetter $$ anon $ 3 $$ anonfun $ run $ 1.apply $ mcV $ sp(TaskResultGetter.scala: 82) at org.apache.spark.scheduler.TaskResultGetter $$ anon $ 3 $$ anonfun $ run $ 1.apply(TaskResultGetter.scala:63) at org.apache.spark.scheduler.TaskResultGetter $$ anon $ 3 $$ anonfun $ run $ 1.apply(TaskResultGetter.scala:63) at org.apache.spark.util.Utils $ .logUncaughtExceptions(Utils.scala:1951) at org.apache.spark.scheduler.TaskResultG etter $$ anon $ 3.run(TaskResultGetter.scala:62) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java: 615) at java.lang.Thread.run(Thread.java:745) 引起:io.netty.channel.AbstractChannel $ AnnotatedConnectException:連接被拒絕:10.12.9.117:38103 at sun.nio.ch.SocketChannelImpl。 checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257) at io.netty .channel.nio.AbstractNioChannel $ AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291) at io.netty.ch annel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:640) 在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) 在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop。 Java的:489) 在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) 在io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run(SingleThreadEventExecutor.java:140) 在io.netty .util.concurrent.DefaultThreadFactory $ DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1更多
爲什麼在通話結束時執行「collect()」? – Yaron
請分享您收到的錯誤,您嘗試處理的數據量,羣集的大小。問題**的來源可能是** collect(),它向驅動程序發送大量數據。 – Yaron
嗨@Yaron,謝謝你的幫助!我加倍檢查,我把收集切換到減少步驟,並刪除本地變量。仍然看到錯誤想到:( –