2017-05-25 82 views
0

我有一些csv格式的城市的最高和最低溫度的示例數據。Spark Python - 如何使用按鍵減少以獲得最小/最大值

Mumbai,19,30 
Delhi,5,41 
Kolkata,20,40 
Mumbai,18,35 
Delhi,4,42 
Delhi,10,44 
Kolkata,19,39 

我想找出記錄使用Python中的火花腳本每個城市都時的最低溫度。

這裏是我的腳本

cityTemp = sc.textFile("weather.txt").map(lambda x: x.split(',')) 

# convert it to pair RDD for performing reduce by Key 

cityTemp = cityTemp.map(lambda x: (x[0], tuple(x[1:]))) 

cityTempMin = cityTemp.reduceByKey(lambda x, y: min(x[0],y[0])) 

cityTempMin.collect() 

我預期的輸出結果如下

Delhi, 4 
Mumbai, 18 
Kolkata, 19 

但是腳本生成以下輸出。

[(u'Kolkata', u'19'), (u'Mumbai', u'18'), (u'Delhi', u'1')] 

如何獲得所需的輸出?

回答

4

嘗試以下解決辦法,如果你必須使用reduceByKey功能:

val df = sc.parallelize(Seq(("Mumbai", 19, 30), 
    ("Delhi", 5, 41), 
    ("Kolkata", 20, 40), 
    ("Mumbai", 18, 35), 
    ("Delhi", 4, 42), 
    ("Delhi", 10, 44), 
    ("Kolkata", 19, 39))).map(x => (x._1,x._2)).keyBy(_._1) 


    df.reduceByKey((accum, n) => if (accum._2 > n._2) n else accum).map(_._2).collect().foreach(println) 

輸出:

(Kolkata,19) 
(Delhi,4) 
(Mumbai,18) 

如果你不想做一個reduceByKey。只是一個小組跟隨min函數會給你想要的結果。

val df = sc.parallelize(Seq(("Mumbai", 19, 30), 
     ("Delhi", 5, 41), 
     ("Kolkata", 20, 40), 
     ("Mumbai", 18, 35), 
     ("Delhi", 4, 42), 
     ("Delhi", 10, 44), 
     ("Kolkata", 19, 39))).toDF("city", "minTemp", "maxTemp") 

     df.groupBy("city").agg(min("minTemp")).show 

輸出:

+-------+------------+ 
| city|min(minTemp)| 
+-------+------------+ 
| Mumbai|   18| 
|Kolkata|   19| 
| Delhi|   4| 
+-------+------------+ 
+0

但也有建議不要使用,因爲性能原因GROUPBY。所以我避免使用它。有沒有其他方法? –

+1

也爲ans添加了reducebyKey解決方案。 –

+1

@RaviChandra我認爲你所指的'groupBy'是'groupByKey'方法,由於中間的混洗,在大數據集的情況下會降低性能。這裏使用的'groupBy'是Spark SQL中的一個函數。 – philantrovert

相關問題