0
現在我正在寫一個Spark流程程序來檢測數據中心網絡的異常。我嘗試使用迴歸算法。例如,我使用訓練數據集計算模型(即係數),然後如何在數據流中使用此先前計算的模型。我使用下面的連接,但得到異常。如何使用Spark Stream中的先前計算結果加入Stream RDD?
Traceback (most recent call last):
File "/home/xiuli/PycharmProjects/benchmark/parx.py", line 98, in <module>
joinedStream = testRDD.join(trainingRDD)
File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 362, in join
File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 313, in transformWith
AttributeError: 'PipelinedRDD' object has no attribute '_jdstream'
我可以看到Spark流指南給出an example,但它缺乏細節。
流數據集加入
這個前面已經解釋的同時DStream.transform 操作所示。這是將數據集加入窗口流 的另一個示例。
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
以下是我的代碼:
from __future__ import print_function
import sys,os,datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.context import SQLContext
from pyspark.resultiterable import ResultIterable
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
import numpy as np
import statsmodels.api as sm
def splitLine(line, delimiter='|'):
values = line.split(delimiter)
st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
return (values[0],st.hour), values[2:]
def reg_m(y, x):
ones = np.ones(len(x[0]))
X = sm.add_constant(np.column_stack((x[0], ones)))
for ele in x[1:]:
X = sm.add_constant(np.column_stack((ele, X)))
results = sm.OLS(y, X).fit()
return results
def train(line):
y,x = [],[]
y, x = [],[[],[],[],[],[],[]]
reading_tmp,temp_tmp = [],[]
i = 0
for reading, temperature in line[1]:
if i%4==0 and len(reading_tmp)==4:
y.append(reading_tmp.pop())
x[0].append(reading_tmp.pop())
x[1].append(reading_tmp.pop())
x[2].append(reading_tmp.pop())
temp = float(temp_tmp[0])
del temp_tmp[:]
x[3].append(temp-20.0 if temp>20.0 else 0.0)
x[4].append(16.0-temp if temp<16.0 else 0.0)
x[5].append(5.0-temp if temp<5.0 else 0.0)
reading_tmp.append(float(reading))
temp_tmp.append(float(temperature))
i = i + 1
return str(line[0]),reg_m(y, x).params.tolist()
def detect(line):
y,x = [],[]
y, x = [],[[],[],[],[],[],[]]
reading_tmp,temp_tmp = [],[]
i = 0
for reading, temperature in line[1]:
if i%4==0 and len(reading_tmp)==4:
y.append(reading_tmp.pop())
x[0].append(reading_tmp.pop())
x[1].append(reading_tmp.pop())
x[2].append(reading_tmp.pop())
temp = float(temp_tmp[0])
del temp_tmp[:]
x[3].append(temp-20.0 if temp>20.0 else 0.0)
x[4].append(16.0-temp if temp<16.0 else 0.0)
x[5].append(5.0-temp if temp<5.0 else 0.0)
reading_tmp.append(float(reading))
temp_tmp.append(float(temperature))
i = i + 1
return line[0],reg_m(y, x).params.tolist()
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: parx.py <checkpointDir> <trainingDataDir> <streamDataDir>", file=sys.stderr)
exit(-1)
checkpoint, trainingInput, streamInput = sys.argv[1:]
sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming")
trainingLines = sc.textFile(trainingInput)
trainingRDD = trainingLines.map(lambda line: splitLine(line, "|"))\
.groupByKey()\
.map(lambda line: train(line)).cache()
ssc = StreamingContext(sc, 1)
ssc.checkpoint(checkpoint)
lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, "|"))
testRDD = lines.groupByKeyAndWindow(1,1).map(lambda line:(str(line[0]), line[1]))
joinedStream = testRDD.join(trainingRDD)
joinedStream.pprint(20)
ssc.start()
ssc.awaitTermination()
我知道這是舊的,但具有u找到了解決辦法 – Bg1850