0
我使用Kafka解析日誌到Spark-Streaming。我有一個函數可以將單個日誌輸出到字典中,以便我可以上傳到mongoDB。然而它保持promptng我這樣的錯誤:pyspark:上傳spark-streaming字典輸出到MongoDB
8 ERROR ShutdownHookManager: Exception while deleting Spark temp
dir: C:\Users\siyang\AppData\Local\Temp\spark-660e59cc-6331-4ed1-b932-ca64f9a1b
8bd
java.io.IOException: Failed to delete: C:\Users\siyang\AppData\Local\Temp\spark-
660e59cc-6331-4ed1-b932-ca64f9a1b8bd
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$m
cV$sp$3.apply(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$m
cV$sp$3.apply(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(Shu
tdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala
:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$ano
nfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at......
以下是代碼。有誰知道什麼是錯的?
from pymongo import MongoClient
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# MongoDB
# ----------------------------------------
client = MongoClient('localhost', 27017)
db = client['ws-database']
collectionNm = 'ws'
collection = db[collectionNm]
# Spark Streaming
# ----------------------------------------
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 1) # listen every 1 second
spark = SparkSession(sc)
kafkaStream = KafkaUtils.createStream(ssc=ssc, zkQuorum='192.168.0.xxx:2181', groupId='m1', topics={'name':1})
def parsing(log):
dict = {}
# split message by line break
log = log[1].split('\n')
for line in log:
if line.find('=') > 0
words = line.split("=")
hash = words[0]
value = words[1].strip().replace('"',"")
hash = hash.strip()
dict[hash] = value
# upload to mongodb
collection.insert_one(dict)
parsed = kafkaStream.map(parsing)
parsed.pprint()
ssc.start()
ssc.awaitTermination()
任何人有什麼想法是什麼問題?謝謝!
它的工作方式像魔術一樣! – Jake