2015-10-19 152 views
19

我想弄清楚獲取Spark數據框列中最大值的最佳方法。獲取Spark數據框列中最大值的最佳方法

請看下面的例子:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) 
df.show() 

它創建:

+---+---+ 
| A| B| 
+---+---+ 
|1.0|4.0| 
|2.0|5.0| 
|3.0|6.0| 
+---+---+ 

我的目標是要找到A列中的最大值(通過檢查,這是3.0)。使用PySpark,這裏是我能想到的四種方法:

# Method 1: Use describe() 
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) 

# Method 2: Use SQL 
df.registerTempTable("df_table") 
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] 

# Method 3: Use groupby() 
df.groupby().max('A').collect()[0].asDict()['max(A)'] 

# Method 4: Convert to RDD 
df.select("A").rdd.max()[0] 

上述每一個給出了正確的答案,但在沒有火花分析工具的我不能告訴這是最好的。

來自直覺或經驗主義的任何想法,對於上述哪些方法在Spark運行時或資源使用方面最有效,或者是否存在比上述方法更直接的方法?

+5

方法2和3相同,使用相同的物理和優化邏輯計劃。方法4將rdd中的max應用於reduce。它可能比直接在DataFrame上運行要慢。方法1或多或少相當於2和3. – zero323

+1

@ zero323那麼'df.select(max(「A」))。collect()[0] .asDict()['max(A)']'?看起來相當於方法2,而更緊湊,也更直觀,方法3。 – desertnaut

+0

- 最慢的是方法4,因爲您對DF進行了整個列的RDD轉換,然後提取最大值; –

回答

15
>df1.show() 
+-----+--------------------+--------+----------+-----------+ 
|floor|   timestamp|  uid|   x|   y| 
+-----+--------------------+--------+----------+-----------+ 
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418| 
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393| 
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585| 
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073| 

>row1 = df1.agg({"x": "max"}).collect()[0] 
>print row1 
Row(max(x)=110.33613) 
>print row1["max(x)"] 
110.33613 

答案與method3幾乎相同。但似乎「asDict()」中的方法3,可以去除

+0

有人可以解釋爲什麼collect()[0]是需要的嗎? – jibiel

+2

@jibiel'collect()'返回一個列表(在本例中是一個單一的項目),所以你需要訪問列表中的第一個(唯一的)項目 –

2

如果一些奇蹟如何使用Scala的做,在這裏你去(使用星火2.0 +):

scala> df.createOrReplaceTempView("TEMP_DF") 
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF"). 
    collect()(0).getInt(0) 
scala> print(myMax) 
117 
6

的最大值一個數據幀的特定列可以通過使用來實現 -

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

0

備註:火花旨在對大數據工作 - 分佈式計算。示例DataFrame的大小非常小,因此實際示例的順序可以相對於小示例進行更改。

最慢:Method_1,因爲.describe( 「A」)來計算最小值,最大值,平均值,STDDEV和計數(5個計算在整個列)

介質:方法4,因爲,.rdd(DF到RDD轉換)減慢了過程。因爲邏輯非常相似,所以Spark的催化劑優化器遵循非常相似的邏輯,並且操作次數最少(獲取特定列的最大值,收集單值數據幀);而且, (.asDict()增加了一個小的額外時間比較3,2〜5)以毫秒爲單位指定羣的一個邊緣節點上

import pandas as pd 
import time 

time_dict = {} 

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) 
#-- For bigger/realistic dataframe just uncomment the following 3 lines 
#lst = list(np.random.normal(0.0, 100.0, 100000)) 
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst}) 
#dfff = self.sqlContext.createDataFrame(pdf) 

tic1 = int(round(time.time() * 1000)) 
# Method 1: Use describe() 
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) 
tac1 = int(round(time.time() * 1000)) 
time_dict['m1']= tac1 - tic1 
print (max_val) 

tic2 = int(round(time.time() * 1000)) 
# Method 2: Use SQL 
dfff.registerTempTable("df_table") 
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] 
tac2 = int(round(time.time() * 1000)) 
time_dict['m2']= tac2 - tic2 
print (max_val) 

tic3 = int(round(time.time() * 1000)) 
# Method 3: Use groupby() 
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)'] 
tac3 = int(round(time.time() * 1000)) 
time_dict['m3']= tac3 - tic3 
print (max_val) 

tic4 = int(round(time.time() * 1000)) 
# Method 4: Convert to RDD 
max_val = dfff.select("A").rdd.max()[0] 
tac4 = int(round(time.time() * 1000)) 
time_dict['m4']= tac4 - tic4 
print (max_val) 

tic5 = int(round(time.time() * 1000)) 
# Method 4: Convert to RDD 
max_val = dfff.agg({"A": "max"}).collect()[0][0] 
tac5 = int(round(time.time() * 1000)) 
time_dict['m5']= tac5 - tic5 
print (max_val) 

print time_dict 

結果(毫秒):

小DF(毫秒): 'm1':7096,'m2':205,'m3':165,'m4':211,'m5':180}

更大DF(ms):{'m1':10260,'m2 ':452,'m3':465,'m4':916,'m5':373}

相關問題