我剛剛通過一個PySpark培訓課程,正在編寫示例代碼行的腳本(這解釋了代碼塊爲什麼不起作用)。每次運行此代碼時,都會出現一次或兩次此錯誤。引發它的線在運行之間改變。我試過設置spark.executor.memory
和spark.executor.heartbeatInterval
,但錯誤仍然存在。我也試過把.cache()
放在各行的末尾,沒有任何變化。爲什麼PySpark會出現隨機的「套接字已關閉」錯誤?
錯誤:
16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python
java.net.SocketException: Socket is closed
at java.net.Socket.shutdownOutput(Socket.java:1551)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)
at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
代碼:
from pyspark import SparkConf, SparkContext
def parseLine(line):
fields = line.split(',')
return (int(fields[0]), float(fields[2]))
def parseGraphs(line):
fields = line.split()
return (fields[0]), [int(n) for n in fields[1:]]
# putting the [*] after local makes it run one executor on each core of your local PC
conf = SparkConf().setMaster("local[*]").setAppName("MyProcessName")
sc = SparkContext(conf = conf)
# parse the raw data and map it to an rdd.
# each item in this rdd is a tuple
# two methods to get the exact same data:
########## All of these methods can use lambda or full methods in the same way ##########
# read in a text file
customerOrdersLines = sc.textFile("file:///SparkCourse/customer-orders.csv")
customerOrdersRdd = customerOrdersLines.map(parseLine)
customerOrdersRdd = customerOrdersLines.map(lambda l: (int(l.split(',')[0]), float(l.split(',')[2])))
print customerOrdersRdd.take(1)
# countByValue groups identical values and counts them
salesByCustomer = customerOrdersRdd.map(lambda sale: sale[0]).countByValue()
print salesByCustomer.items()[0]
# use flatMap to cut everything up by whitespace
bookText = sc.textFile("file:///SparkCourse/Book.txt")
bookRdd = bookText.flatMap(lambda l: l.split())
print bookRdd.take(1)
# create key/value pairs that will allow for more complex uses
names = sc.textFile("file:///SparkCourse/marvel-names.txt")
namesRdd = names.map(lambda line: (int(line.split('\"')[0]), line.split('\"')[1].encode("utf8")))
print namesRdd.take(1)
graphs = sc.textFile("file:///SparkCourse/marvel-graph.txt")
graphsRdd = graphs.map(parseGraphs)
print graphsRdd.take(1)
# this will append "extra text" to each name.
# this is faster than a normal map because it doesn't give you access to the keys
extendedNamesRdd = namesRdd.mapValues(lambda heroName: heroName + "extra text")
print extendedNamesRdd.take(1)
# not the best example because the costars is already a list of integers
# but this should return a list, which will update the values
flattenedCostarsRdd = graphsRdd.flatMapValues(lambda costars: costars)
print flattenedCostarsRdd.take(1)
# put the heroes in ascending index order
sortedHeroes = namesRdd.sortByKey()
print sortedHeroes.take(1)
# to sort heroes by alphabetical order, we switch key/value to value/key, then sort
alphabeticalHeroes = namesRdd.map(lambda (key, value): (value, key)).sortByKey()
print alphabeticalHeroes.take(1)
# make sure that "spider" is in the name of the hero
spiderNames = namesRdd.filter(lambda (id, name): "spider" in name.lower())
print spiderNames.take(1)
# reduce by key keeps the key and performs aggregation methods on the values. in this example, taking the sum
combinedGraphsRdd = flattenedCostarsRdd.reduceByKey(lambda value1, value2: value1 + value2)
print combinedGraphsRdd.take(1)
# broadcast: this is accessible from any executor
sentData = sc.broadcast(["this can be accessed by all executors", "access it using sentData"])
# accumulator: this is synced across all executors
hitCounter = sc.accumulator(0)
你能告訴它在哪一步返回錯誤嗎?你們有沒有打印作品? –
您可能已經混淆了源端口和目標端口。默認連接模式'任何(可用)>>目標端口',或許默認端口是80,那麼你不能連接到80端口。我強烈建議您使用Wireshark檢查客戶端和服務器連接。 – dsgdfg
什麼是Spark版本?你可以啓動'pyspark'並輸入一些沒有錯誤的命令嗎?這是Windows,不是嗎?你如何執行上面的代碼? –