0
我想拉動kafka數據來引發流式處理,從HDFS中加載已經建好的模型,然後使用kafka消息做出預測。Pyspark預測使用kafka直接流
我試了好幾種方法,但我因爲一個TypeError的又卡在model.predict:不能類型轉換成矢量
從卡夫卡接收到的數據是浮動逗號分隔。
這裏是我的代碼:
sc = SparkContext(appName="PythonStreamingKafkaForecast")
ssc = StreamingContext(sc, 10)
# Create stream to get kafka messages
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["my_topic"], {"metadata.broker.list": "kafka_ip"})
features = directKafkaStream.foreachRDD(lambda rdd: rdd.map(lambda s: Vectors.dense(s[1].split(","))))
model = LinearRegressionModel.load(sc, "hdfs://hadoop_ip/model.model")
#Predict
predicted = model.predict(features)
我也試過這樣:
lines = directKafkaStream.map(lambda x: x[1])
features = lines.map(lambda data: Vectors.dense([float(c) for c in data.split(',')]))
但是這一次,是功能型TransformedStream,不會對preidctions工作的...
你能告訴我我做錯了什麼嗎?
謝謝您的幫助