2017-02-04 44 views
0

我想拉動kafka數據來引發流式處理,從HDFS中加載已經建好的模型,然後使用kafka消息做出預測。Pyspark預測使用kafka直接流

我試了好幾種方法,但我因爲一個TypeError的又卡在model.predict:不能類型轉換成矢量

從卡夫卡接收到的數據是浮動逗號分隔。

這裏是我的代碼:

sc = SparkContext(appName="PythonStreamingKafkaForecast") 
ssc = StreamingContext(sc, 10) 

# Create stream to get kafka messages 
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["my_topic"], {"metadata.broker.list": "kafka_ip"}) 

features = directKafkaStream.foreachRDD(lambda rdd: rdd.map(lambda s: Vectors.dense(s[1].split(",")))) 

model = LinearRegressionModel.load(sc, "hdfs://hadoop_ip/model.model") 

#Predict 
predicted = model.predict(features) 

我也試過這樣:

lines = directKafkaStream.map(lambda x: x[1]) 
features = lines.map(lambda data: Vectors.dense([float(c) for c in data.split(',')])) 

但是這一次,是功能型TransformedStream,不會對preidctions工作的...

你能告訴我我做錯了什麼嗎?

謝謝您的幫助

回答

0

好吧,這個問題是試圖從卡夫卡讀取數據,即使該主題是空的。

這解決了我的問題:

def predict(rdd): 
    count = rdd.count() 
    if (count > 0): 
     features = rdd.map(lambda s: Vectors.dense(s[1].split(","))) 

     return features 
    else: 
    print("No data received") 

directKafkaStream.foreachRDD(lambda rdd: predict(rdd))