2017-08-17 87 views
1

我使用pysparkpyspark localiterator監聽器已經停止

我用這個代碼:

a = rdd.map(lambda x: function).toLocalIterator() 
from collections import Counter 
c = sum(a,Counter()) 

,並出現以下錯誤

錯誤LiveListenerBus:SparkListenerBus已經停止!滴加 事件

SparkListenerStageCompleted([email protected]

WARN的Utils:在抑制異常最後:在 重置連接:連接復位 java.net.SocketException異常java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115) at java.net.SocketOutputStream.write(SocketOutputStream.java:155)at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java。 io.BufferedOutputStream.flus h(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123)at java.io.FilterOutputStream.close(FilterOutputStream.java:158)at org.apache.spark.api。 python.PythonRDD $$ anon $ 2 $$ anonfun $ run $ 2.apply $ mcV $ sp(PythonRDD.scala:707) at org.apache.spark.util.Utils $ .tryWithSafeFinally(Utils.scala:1346) at org.apache.spark.api.python.PythonRDD $$ anon $ 2.run(PythonRDD.scala:706) Suppressed:java.net.SocketException:損壞的管道(寫入失敗)在 java.net.SocketOutputStream.socketWrite0(Native方法)java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) 在 java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 在java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) 在java.io.FilterOutputStream.close(FilterOutputStream.java:158) 在java.io.FilterOutputStream.close(FilterOutputStream.java:159) ... 3個

當我使用這個代碼,而不是,它讓我正確的結果,並沒有錯誤

c = Counter() 
for i,pair in a: 
    c+=Counter(pair) 

我試圖玩rdd地圖中的分區。

沒有用。

這兩個代碼段應該以相同的方式工作,有什麼區別?爲什麼第一個不能工作?

感謝

回答

0

的問題是在驅動器內存不足,我解決它使用

conf = SparkConf() 
conf.set("spark.driver.memory", "3G") 
sc = SparkContext(conf=conf) 

盯着火花背景