2017-10-05 26 views
0

我使用Kafka解析日誌到Spark-Streaming。我有一個函數可以將單個日誌輸出到字典中,以便我可以上傳到mongoDB。然而它保持promptng我這樣的錯誤:pyspark:上傳spark-streaming字典輸出到MongoDB

8 ERROR ShutdownHookManager: Exception while deleting Spark temp 
dir: C:\Users\siyang\AppData\Local\Temp\spark-660e59cc-6331-4ed1-b932-ca64f9a1b 
8bd 
java.io.IOException: Failed to delete: C:\Users\siyang\AppData\Local\Temp\spark- 
660e59cc-6331-4ed1-b932-ca64f9a1b8bd 
     at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031) 
     at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$m 
cV$sp$3.apply(ShutdownHookManager.scala:65) 
     at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$m 
cV$sp$3.apply(ShutdownHookManager.scala:62) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize 
d.scala:33) 
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
     at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(Shu 
tdownHookManager.scala:62) 
     at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala 
:216) 
     at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$ano 
nfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188) 
     at...... 

以下是代碼。有誰知道什麼是錯的?

from pymongo import MongoClient 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 


# MongoDB 
# ---------------------------------------- 
client = MongoClient('localhost', 27017) 
db = client['ws-database'] 
collectionNm = 'ws' 
collection = db[collectionNm] 


# Spark Streaming 
# ---------------------------------------- 
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01") 
sc.setLogLevel("WARN") 
ssc = StreamingContext(sc, 1) # listen every 1 second 

spark = SparkSession(sc) 
kafkaStream = KafkaUtils.createStream(ssc=ssc, zkQuorum='192.168.0.xxx:2181', groupId='m1', topics={'name':1}) 


def parsing(log): 
    dict = {} 

    # split message by line break 
    log = log[1].split('\n') 
    for line in log: 
     if line.find('=') > 0 
      words = line.split("=") 
      hash = words[0] 
      value = words[1].strip().replace('"',"") 
      hash = hash.strip() 
      dict[hash] = value 

    # upload to mongodb 
    collection.insert_one(dict) 


parsed = kafkaStream.map(parsing) 
parsed.pprint() 


ssc.start() 
ssc.awaitTermination() 

任何人有什麼想法是什麼問題?謝謝!

回答

2

地圖是對您的DStream的轉換操作。您想要使用foreachRDD,它是由流創建的每個RDD的通用輸出操作。代碼看起來像這樣:

def sendRecord(partition): 
    client = MongoClient('localhost', 27017) 
    db = client['ws-database'] 
    collectionNm = 'ws' 
    collection = db[collectionNm] 

    for line in partition: 
     // parsing step 
     collection.insert_one(dict) 

    client.close() 


kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendRecord)) 
kafkaStream.pprint() 
+0

它的工作方式像魔術一樣! – Jake