2015-04-30 40 views
2

我是Spark新手,試圖創建一些簡單的東西。 我有一個2列的文件:日期和值。 對於每第6次約會,我想使用線性迴歸給出基於之前5個值的投影。 然後將它寫出到一個文件中,並將實際值與預計值之間的誤差寫入第3列。Spark中的第一個代碼(Python)

到目前爲止,我有這樣的:

from pyspark import SparkContext, SparkConf 
from datetime import datetime 
from timeit import timeit 

def after(line, date): 
    current = line.split(';')[0] 
    currentDate = datetime.strptime(current, '%Y.%m.%d') 
    if currentDate > datetime.strptime(date, '%Y.%m.%d'): 
    return True 
    else: 
    return False 

def getValue(line): 
    return float(line.split(';')[1]) 

conf = SparkConf().setAppName("test").setMaster("local") 
sc = SparkContext(conf=conf) 

data = sc.textFile('./AAPL.csv') 

average = latest_data.reduce(lambda l1, l2: l1+l2)/latest_data.count() 

printed_data = data.filter(lambda line: after(line, u'2015.03.27')).map(lambda d: [getValue(d), average, (getValue(d)-average)*(getValue(d)-average)]) 

printed_data.saveAsTextFile('./result.txt') 

我的問題是我不知道如何在星火創建循環類的東西。我的意思是,如果我有,例如,10臺計算機上工作,我想計算並行運行...

而作爲下一步,我只想得到result.txt中的錯誤的總和文件。 (只有這一個號碼。)

+0

如果你想做一些簡單的事情,我建議你改變這個問題。難度來自連續6批次的批次。 'sc.textFile'是基於行的。很可能有些批次將分佈在不同的分區,不同的機器上(如果您有多個分區)。你可能會看到這個問題。 –

+0

@DanielDarabos我認爲你可以flatMap到一個鍵/值:鍵(行的索引),值(日期/值)。然後按每個整數6個/ 6組配對(前6個,直到6/6 = 1,12/6 = 2等)。然後做你的計算,這將更分散。 – theMadKing

回答

0

檢查sc.parallelize看看這是你想要的。

data = sc.textFile('./AAPL.csv') 

後,你可能想要做像

distData = sc.parallelize(data) 

distDAta = sc.parallelize(data, 10) // say you want to split those 10 piece and run it (regardless how your environment actually can though...) 
0

我假設你有興趣把它並行計算用的火花。因此,讓RDD中的每個元素都相互獨立是非常重要的。將每六行/記錄放入一個元素非常重要。正如MadKing指出的那樣,不需要使用任何迴路,因爲除了某些功能,RDD的順序保證爲。我個人更喜歡使用RDD索引而不是#range()。

以下是用於將讀取RDD轉換爲此類計算的僞代碼。基本上,1)創建第#周,並將其用作聚合鍵以用於以後的聚合2)對RDD的單個元素的值進行線性迴歸。

首先讀取文件,您需要編寫自己的解析器函數將csv的每一行分割爲日期和值的元組。並使用zipWithIndex()來獲取行號。

lineIndexRDD = sc.textFile('./AAPL.csv').map(parser).zipWithIndex() 
# lineIndexRDD is now (date, value, line#) 

接着,映射線#到周#,所以每6行將具有由6 devided行號相同的星期編號,並且該週數可以用來聚集。

weekRDD = lineIndexRDD.map(lambda (date, value, lineNum):(lineNum/6, (date, value))).groupByKey() 
# weekIndexRDD is now (week#, [(date1, value1),(date2, value2),...(date6, value6)]) 

現在因爲RDD的每一個元素,你需要編寫自己的線性迴歸函數來計算值輸入數組中,並返回預測和錯誤,你可以做線性迴歸。

lrRDD = weekRDD.map(myLinearRegression) 
#your LinearRegression function should take a tuple of (week#, 6daysRecordsArray) and output a tuple of (week#, prediction, error, 6daysRecordsArray) 
# lrRDD is now (week#, prediction, error, [(date1, value1),(date2, value2),...(date6, value6)]) 

最後,你可以重新格式化RDD到你想要的最終格式,並把它寫下來,這是非常羅嗦以下,但你可以寫一個輔助函數它。我不確定你想在第6天約會,所以請仔細檢查一下。

finalRDD = lrRDD 
.flatMap(lambda(weekNum, prediction, error, [(d1, v1),(d2, v2),...(d6, v6)]):(d1, v1),(d2, v2),...(d5, v5),(d6, v6, prediction, error) 
).saveAsTextFile(outputFile.name) 
#don't forget to insert newline (\n) in between!