我使用pysparkpyspark localiterator監聽器已經停止
我用這個代碼:
a = rdd.map(lambda x: function).toLocalIterator()
from collections import Counter
c = sum(a,Counter())
,並出現以下錯誤
錯誤LiveListenerBus:SparkListenerBus已經停止!滴加 事件
SparkListenerStageCompleted([email protected])
WARN的Utils:在抑制異常最後:在 重置連接:連接復位 java.net.SocketException異常java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115) at java.net.SocketOutputStream.write(SocketOutputStream.java:155)at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java。 io.BufferedOutputStream.flus h(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123)at java.io.FilterOutputStream.close(FilterOutputStream.java:158)at org.apache.spark.api。 python.PythonRDD $$ anon $ 2 $$ anonfun $ run $ 2.apply $ mcV $ sp(PythonRDD.scala:707) at org.apache.spark.util.Utils $ .tryWithSafeFinally(Utils.scala:1346) at org.apache.spark.api.python.PythonRDD $$ anon $ 2.run(PythonRDD.scala:706) Suppressed:java.net.SocketException:損壞的管道(寫入失敗)在 java.net.SocketOutputStream.socketWrite0(Native方法)java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) 在 java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 在java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) 在java.io.FilterOutputStream.close(FilterOutputStream.java:158) 在java.io.FilterOutputStream.close(FilterOutputStream.java:159) ... 3個
當我使用這個代碼,而不是,它讓我正確的結果,並沒有錯誤
c = Counter()
for i,pair in a:
c+=Counter(pair)
我試圖玩rdd地圖中的分區。
沒有用。
這兩個代碼段應該以相同的方式工作,有什麼區別?爲什麼第一個不能工作?
感謝