3

我試圖使用隨機森林模型來預測示例流,但看起來我無法使用該模型對示例進行分類。 這裏是pyspark使用的代碼:結合Spark Streaming + MLlib

sc = SparkContext(appName="App") 

model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', numTrees=150) 


ssc = StreamingContext(sc, 1) 
lines = ssc.socketTextStream(hostname, int(port)) 

parsedLines = lines.map(parse) 
parsedLines.pprint() 

predictions = parsedLines.map(lambda event: model.predict(event.features)) 

並且在集羣中的編譯它返回的錯誤:

Error : "It appears that you are attempting to reference SparkContext from a broadcast " 
    Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. 

是有使用從靜態數據產生的MODELE以預測的方式流媒體示例?

謝謝你們,我真的很感激它!

+0

我寫了一個類似的問題在這裏https://stackoverflow.com/questions/48846882/pyspark-ml-streaming –

回答

3

是的,您可以使用從靜態數據生成的模型。您遇到的問題根本與流式傳輸無關。您無法在操作或轉換中使用基於JVM的模型(有關解釋原因,請參閱How to use Java/Scala function from an action or a transformation?)。相反,你應該申請predict方法完全RDD例如使用transformDStream

from pyspark.mllib.tree import RandomForest 
from pyspark.mllib.util import MLUtils 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from operator import attrgetter 


sc = SparkContext("local[2]", "foo") 
ssc = StreamingContext(sc, 1) 

data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') 
trainingData, testData = data.randomSplit([0.7, 0.3]) 

model = RandomForest.trainClassifier(
    trainingData, numClasses=2, nmTrees=3 
) 

(ssc 
    .queueStream([testData]) 
    # Extract features 
    .map(attrgetter("features")) 
    # Predict 
    .transform(lambda _, rdd: model.predict(rdd)) 
    .pprint()) 

ssc.start() 
ssc.awaitTerminationOrTimeout(10) 
+0

你會如何將來自套接字的字符串轉換爲標記點? –

+0

我在這裏寫了一個類似的問題:https://stackoverflow.com/questions/48846882/pyspark-ml-streaming –