2015-06-17 31 views
0

現在我正在寫一個Spark流程程序來檢測數據中心網絡的異常。我嘗試使用迴歸算法。例如,我使用訓練數據集計算模型(即係數),然後如何在數據流中使用此先前計算的模型。我使用下面的連接,但得到異常。如何使用Spark Stream中的先前計算結果加入Stream RDD?

Traceback (most recent call last): 
    File "/home/xiuli/PycharmProjects/benchmark/parx.py", line 98, in <module> 
    joinedStream = testRDD.join(trainingRDD) 
    File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 362, in join 
    File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 313, in transformWith 
AttributeError: 'PipelinedRDD' object has no attribute '_jdstream' 

我可以看到Spark流指南給出an example,但它缺乏細節。

流數據集加入

這個前面已經解釋的同時DStream.transform 操作所示。這是將數據集加入窗口流 的另一個示例。

dataset = ... # some RDD 
windowedStream = stream.window(20) 
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset)) 

以下是我的代碼:

from __future__ import print_function 
import sys,os,datetime 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.sql.context import SQLContext 
from pyspark.resultiterable import ResultIterable 
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD 
import numpy as np 
import statsmodels.api as sm 


def splitLine(line, delimiter='|'): 
    values = line.split(delimiter) 
    st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S') 
    return (values[0],st.hour), values[2:] 

def reg_m(y, x): 
    ones = np.ones(len(x[0])) 
    X = sm.add_constant(np.column_stack((x[0], ones))) 
    for ele in x[1:]: 
     X = sm.add_constant(np.column_stack((ele, X))) 
    results = sm.OLS(y, X).fit() 
    return results 

def train(line): 
    y,x = [],[] 
    y, x = [],[[],[],[],[],[],[]] 
    reading_tmp,temp_tmp = [],[] 
    i = 0 
    for reading, temperature in line[1]: 
     if i%4==0 and len(reading_tmp)==4: 
      y.append(reading_tmp.pop()) 
      x[0].append(reading_tmp.pop()) 
      x[1].append(reading_tmp.pop()) 
      x[2].append(reading_tmp.pop()) 
      temp = float(temp_tmp[0]) 
      del temp_tmp[:] 
      x[3].append(temp-20.0 if temp>20.0 else 0.0) 
      x[4].append(16.0-temp if temp<16.0 else 0.0) 
      x[5].append(5.0-temp if temp<5.0 else 0.0) 
     reading_tmp.append(float(reading)) 
     temp_tmp.append(float(temperature)) 
     i = i + 1 
    return str(line[0]),reg_m(y, x).params.tolist() 




def detect(line): 
    y,x = [],[] 
    y, x = [],[[],[],[],[],[],[]] 
    reading_tmp,temp_tmp = [],[] 
    i = 0 
    for reading, temperature in line[1]: 
     if i%4==0 and len(reading_tmp)==4: 
      y.append(reading_tmp.pop()) 
      x[0].append(reading_tmp.pop()) 
      x[1].append(reading_tmp.pop()) 
      x[2].append(reading_tmp.pop()) 
      temp = float(temp_tmp[0]) 
      del temp_tmp[:] 
      x[3].append(temp-20.0 if temp>20.0 else 0.0) 
      x[4].append(16.0-temp if temp<16.0 else 0.0) 
      x[5].append(5.0-temp if temp<5.0 else 0.0) 
     reading_tmp.append(float(reading)) 
     temp_tmp.append(float(temperature)) 
     i = i + 1 
    return line[0],reg_m(y, x).params.tolist() 




if __name__ == "__main__": 
    if len(sys.argv) != 4: 
     print("Usage: parx.py <checkpointDir> <trainingDataDir> <streamDataDir>", file=sys.stderr) 
     exit(-1) 

    checkpoint, trainingInput, streamInput = sys.argv[1:] 
    sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming") 
    trainingLines = sc.textFile(trainingInput) 
    trainingRDD = trainingLines.map(lambda line: splitLine(line, "|"))\ 
           .groupByKey()\ 
           .map(lambda line: train(line)).cache() 




    ssc = StreamingContext(sc, 1) 
    ssc.checkpoint(checkpoint) 
    lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, "|")) 

    testRDD = lines.groupByKeyAndWindow(1,1).map(lambda line:(str(line[0]), line[1])) 
    joinedStream = testRDD.join(trainingRDD) 
    joinedStream.pprint(20) 

    ssc.start() 
    ssc.awaitTermination() 
+0

我知道這是舊的,但具有u找到了解決辦法 – Bg1850

回答

1

根據您提到的文檔,請嘗試:

testRDD.transform(lambda rdd: rdd.join(trainingRDD)) 
相關問題