2016-12-14 26 views
0

我正在使用PySpark處理溫度預測數據。在PySpark中的groupByKey之後減少ResultIterable對象

以下列格式的原始溫度數據:

station;date;time,temperature;quality 
102170;2012-11-01;06:00:00;6.8;G 
102185;2012-11-02;06:00:00;5.8;G 
102170;2013-11-01;18:00:00;2.8;G 
102185;2013-11-01;18:00:00;7.8;G 

目標結果是越來越每年的最小/最大溫度,在該站所提到的,如下所示:

year;station;max_temp 
2013;102185;7.8 
2012;102170;6.8 

我現在的代碼如下:

sc = SparkContext(appName="maxMin") 
lines = sc.textFile('data/temperature-readings.csv') 
lines = lines.map(lambda a: a.split(";")) 
lines = lines.filter(lambda x: int(x[1][0:4]) >= 1950 and int(x[1][0:4]) <= 2014) 
temperatures = lines.map(lambda x: (x[1][0:4], (x[0], float(x[3])))) 

到目前爲止,結果如下:

temperatures.take(4) 

(2012, (102170,6.8)) 
(2012, (102185,5.8)) 
(2013, (102170,2.8)) 
(2013, (102185,7.8)) 

通過關鍵分組後,隨着如下:

temperatures = temperatures.groupByKey() 
temperatures.take(2) 

[(u'2012', <pyspark.resultiterable.ResultIterable object at 0x2a0be50>), 
(u'2013', <pyspark.resultiterable.ResultIterable object at 0x2a0bc50>)] 

所以,我怎樣才能減少這些resultiterable對象與最低或最高溫度只得到元素。

+0

是否有使用'rdd'而不是'DataFrame'的原因? – mtoto

+0

你認爲這會有所作爲嗎? –

回答

1

只是不。使用按鍵減少:

lines.map(lambda x: (x[1][0:4], (x[0], float(x[3])))).map(lambda x: (x, x)) \ 
    .reduceByKey(lambda x, y: (
     min(x[0], y[0], key=lambda x: x[1]), 
     max(x[1], y[1], , key=lambda x: x[1])))