2016-11-25 74 views
2

我有兩個大火花DataFrames,都包含座標。讓我們把他們的位置和地點:評估來自兩個數據幀的行的所有組合

loc = [('01', 0.2, 0.9), ('02', 0.3, 0.6), ('03', 0.8, 0.1)] 
locations = sqlContext.createDataFrame(loc, schema=['id', 'X', 'Y']) 

site = [('A', 0.7, 0.1), ('B', 0.3, 0.7), ('C', 0.9, 0.3), ('D', 0.3, 0.8)] 
sites = sqlContext.createDataFrame(site, schema=['name', 'X', 'Y']) 

地點:

+---+---+---+ 
| id| X| Y| 
+---+---+---+ 
| 01|0.2|0.9| 
| 02|0.3|0.6| 
| 03|0.8|0.1| 
+---+---+---+ 

網站:

+----+---+---+ 
|name| X| X| 
+----+---+---+ 
| A|0.7|0.1| 
| B|0.3|0.7| 
| C|0.9|0.3| 
| D|0.3|0.8| 
+----+---+---+ 

現在我想計算哪些是最接近的有效途徑站點的位置。所以,我得到這樣的:

+----+---+ 
|name| id| 
+----+---+ 
| A| 03| 
| B| 02| 
| C| 03| 
| D| 01| 
+----+---+ 

我想首先讓所有的信息的一個的大數據幀,然後使用的map/reduce來獲取位置標識的最接近的所有網站。然而,我不知道這是否是正確的方法,或者我會如何用火花去做這件事。目前我使用的是:

closest_locations = [] 
for s in sites.rdd.collect(): 
    min_dist = float('inf') 
    min_loc = None 
    for l in locations.rdd.collect(): 
     dist = (l.X - s.X)**2 + (l.Y - s.Y)**2 
     if dist < min_dist: 
      min_dist = dist 
      min_loc = l.id 
    closest_locations.append((s.name, min_loc)) 

selected_locations = sqlContext.createDataFrame(closest_locations, schema=['name', 'id']) 

但我想要一個更像火花的方法,因爲上面顯然很慢。如何有效地評估兩個火花數據幀的所有行組合?

回答

3

您可以:

from pyspark.sql.functions import udf, struct 
from pyspark.sql import DoubleType 


dist = udf(lamdba x1, y1, x2, y2: (x1 - x2)**2 + (y1 - y1)**2, DoubleType()) 

locations.join(sites).withColumn("dist", dist(
    locations.X, locations.Y, sites.X, sites.Y)).select(
    "name", struct("id", "dist") 
).rdd.reduceByKey(lambda x, y: min(x, y, key=lambda x: x[1]))