2014-03-04 89 views
0

我已經能夠在測試集羣上使用Anaconda獲得運行線性迴歸示例的pyspark。它太酷了。在PySpark中簡化Python lambda代碼更容易理解

我的下一步是讓我們的分析師對代碼進行更多模板化。具體而言,我想重寫下面的lambda函數作爲常規函數,以便更容易訪問我們在Python中的當前技能級別。我做了很多次嘗試,但是map,lambda和numpy.array的組合都是讓人困惑的。

data = sc.textFile("hdfs://nameservice1:8020/spark_input/linear_regression/lpsa.data") 
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')])) 

整個程式都在下面。任何幫助讚賞。

#!/opt/tools/anaconda/bin python 

from pyspark import SparkConf, SparkContext 
from pyspark.mllib.regression import LinearRegressionWithSGD 
from numpy import array 

conf = SparkConf() 
conf.setMaster("local") 
conf.setAppName("Python - Linear Regression Test") 
conf.set("spark.executor.memory", "1g") 

sc = SparkContext(conf = conf) 

# Load and parse the data 

data = sc.textFile("hdfs://nameservice1:8020/spark_input/linear_regression/lpsa.data") 
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')])) 

# Build the model 
numIterations = 50 
model = LinearRegressionWithSGD.train(parsedData, numIterations) 

# Evaluate model on training examples and compute training error 
valuesAndPreds = parsedData.map(lambda point: (point.item(0), model.predict(point.take(range(1, point.size))))) 

MSE = valuesAndPreds.map(lambda (v, p): (v-p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() 
print("training Mean Squared Error = " + str(MSE)) 

回答

1
def line_to_array(line): 
    space_separated_line = line.replace(',', ' ') 
    string_array = space_separated_line.split(' ') 
    float_array = map(float, string_array) 
    return array(float_array) 

parsedData = map(line_to_float_array, data) 

或者等價地,

def line_to_array(line): 
    space_separated_line = line.replace(',', ' ') 
    string_array = space_separated_line.split(' ') 
    float_array = [float(x) for x in string_array] 
    return array(float_array) 

parsedData = [line_to_float_array(line) for line in data] 
+0

AH,現在我明白了。感謝Amadan! – Jeb

0

Amadan的答案是Python本身,這是原來的問題的範圍內正確的。但是,在Spark中使用RDD(彈性分佈式數據集)時,實現看起來略有不同,因爲使用Spark的映射函數而不是Python:

# Declare functions at startup: 
if __name__ == "__main__": 
    def line_to_float_array(line): 
     string_array = line.replace(',', ' ').split(' ') 
     float_array = map(float, string_array) 
     return array(float_array) 

# 
# 
sc = SparkContext(conf = conf) 

# Load and parse the data 
data = sc.textFile("hdfs://nameservice1:8020/sparkjeb/lpsa.data") 
parsedData = data.map(line_to_float_array)