2016-09-21 47 views
11

我剛剛通過一個PySpark培訓課程,正在編寫示例代碼行的腳本(這解釋了代碼塊爲什麼不起作用)。每次運行此代碼時,都會出現一次或兩次此錯誤。引發它的線在運行之間改變。我試過設置spark.executor.memoryspark.executor.heartbeatInterval,但錯誤仍然存​​在。我也試過把.cache()放在各行的末尾,沒有任何變化。爲什麼PySpark會出現隨機的「套接字已關閉」錯誤?

錯誤:

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python 
java.net.SocketException: Socket is closed 
     at java.net.Socket.shutdownOutput(Socket.java:1551) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344) 
     at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857) 
     at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 

代碼:

from pyspark import SparkConf, SparkContext 

def parseLine(line): 
    fields = line.split(',') 
    return (int(fields[0]), float(fields[2])) 

def parseGraphs(line): 
    fields = line.split() 
    return (fields[0]), [int(n) for n in fields[1:]] 

# putting the [*] after local makes it run one executor on each core of your local PC 
conf = SparkConf().setMaster("local[*]").setAppName("MyProcessName") 

sc = SparkContext(conf = conf) 

# parse the raw data and map it to an rdd. 
# each item in this rdd is a tuple 
# two methods to get the exact same data: 
########## All of these methods can use lambda or full methods in the same way ########## 
# read in a text file 
customerOrdersLines = sc.textFile("file:///SparkCourse/customer-orders.csv") 
customerOrdersRdd = customerOrdersLines.map(parseLine) 
customerOrdersRdd = customerOrdersLines.map(lambda l: (int(l.split(',')[0]), float(l.split(',')[2]))) 
print customerOrdersRdd.take(1) 

# countByValue groups identical values and counts them 
salesByCustomer = customerOrdersRdd.map(lambda sale: sale[0]).countByValue() 
print salesByCustomer.items()[0] 

# use flatMap to cut everything up by whitespace 
bookText = sc.textFile("file:///SparkCourse/Book.txt") 
bookRdd = bookText.flatMap(lambda l: l.split()) 
print bookRdd.take(1) 

# create key/value pairs that will allow for more complex uses 
names = sc.textFile("file:///SparkCourse/marvel-names.txt") 
namesRdd = names.map(lambda line: (int(line.split('\"')[0]), line.split('\"')[1].encode("utf8"))) 
print namesRdd.take(1) 

graphs = sc.textFile("file:///SparkCourse/marvel-graph.txt") 
graphsRdd = graphs.map(parseGraphs) 
print graphsRdd.take(1) 

# this will append "extra text" to each name. 
# this is faster than a normal map because it doesn't give you access to the keys 
extendedNamesRdd = namesRdd.mapValues(lambda heroName: heroName + "extra text") 
print extendedNamesRdd.take(1) 

# not the best example because the costars is already a list of integers 
# but this should return a list, which will update the values 
flattenedCostarsRdd = graphsRdd.flatMapValues(lambda costars: costars) 
print flattenedCostarsRdd.take(1) 

# put the heroes in ascending index order 
sortedHeroes = namesRdd.sortByKey() 
print sortedHeroes.take(1) 

# to sort heroes by alphabetical order, we switch key/value to value/key, then sort 
alphabeticalHeroes = namesRdd.map(lambda (key, value): (value, key)).sortByKey() 
print alphabeticalHeroes.take(1) 

# make sure that "spider" is in the name of the hero 
spiderNames = namesRdd.filter(lambda (id, name): "spider" in name.lower()) 
print spiderNames.take(1) 

# reduce by key keeps the key and performs aggregation methods on the values. in this example, taking the sum 
combinedGraphsRdd = flattenedCostarsRdd.reduceByKey(lambda value1, value2: value1 + value2) 
print combinedGraphsRdd.take(1) 

# broadcast: this is accessible from any executor 
sentData = sc.broadcast(["this can be accessed by all executors", "access it using sentData"]) 

# accumulator: this is synced across all executors 
hitCounter = sc.accumulator(0) 
+0

你能告訴它在哪一步返回錯誤嗎?你們有沒有打印作品? –

+0

您可能已經混淆了源端口和目標端口。默認連接模式'任何(可用)>>目標端口',或許默認端口是80,那麼你不能連接到80端口。我強烈建議您使用Wireshark檢查客戶端和服務器連接。 – dsgdfg

+0

什麼是Spark版本?你可以啓動'pyspark'並輸入一些沒有錯誤的命令嗎?這是Windows,不是嗎?你如何執行上面的代碼? –

回答

0

免責聲明:我還沒有火花的代碼庫的一部分花了足夠的時間,但讓我給你一些提示可能會導致解決方案。接下來只是解釋在哪裏尋找更多的信息,而不是解決問題的方法。


你所面臨的例外是由於所看到的代碼here(您可以通過線java.net.Socket.shutdownOutput(Socket.java:1551)時執行worker.shutdownOutput()這是看到的)一些其他的問題。

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python 
java.net.SocketException: Socket is closed 
     at java.net.Socket.shutdownOutput(Socket.java:1551) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344) 
     at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870) 
     at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857) 
     at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 

這使我相信ERROR是其他一些早期錯誤的後續行爲。

的蟒蛇名稱標準輸出作家the name of the thread是(使用EvalPythonExec物理運算符)負責Spark和pyspark(這樣你就可以執行沒有太多的改變Python代碼)之間的通信。

事實上the scaladoc of EvalPythonExec給出了很多關於pyspark在內部使用的底層通信基礎設施的信息,並且它使用套接字來連接外部的Python進程。

Python evaluation works by sending the necessary (projected) input data via a socket to an external Python process, and combine the result from the Python process with the original row.

此外,python默認情況下使用,除非使用PYSPARK_DRIVER_PYTHONPYSPARK_PYTHON(你可以在pyspark shell腳本herehere見)覆蓋。這是出現在失敗線程名稱中的名稱。

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python

我建議使用以下命令在您的系統上檢查python的版本。

python -c 'import sys; print(sys.version_info)' 

That should be Python 2.7+,但可能是因爲你使用的是沒有很好地與星火測試了最新的Python。 猜測...


您應該包括pyspark應用程序執行的整個日誌而這也正是我希望找到答案。