2016-12-07 77 views
0

我目前正在解決一個涉及來自總線的GPS數據的問題。我面臨的問題是在我的過程中減少計算。在Apache Spark中過濾空間數據

在一張桌子上有大約20億個GPS座標點(Lat-Long度數),在另一個桌子上有大約12,000個Lat-Long座標的公交站點。預計20億點的5-10%在公交車站。

問題:我只需要標記和提取公交車站點(12,000點)中的那些點數(20億)。由於這是GPS數據,我不能做精確匹配的座標,而是做一個基於容差的geofencing。

問題:使用當前的樸素方法,標記公交車站的過程需要很長時間。目前,我們正在挑選12,000個公交站點中的每一個站點,並以100米的容差查詢20億個點(通過將程度差異轉換爲距離)。

問題:是否有一個算法高效的過程來實現這個標記點?

+0

使用k-d樹可以開始。 –

+0

我工作過類似的用例。我們使用'GeoHashes'的屬性來定義單元格,並定義每個單元格的過程。這仍然是一個廣泛的問題。也許你可以展示你目前的方法的代碼來推動討論? – maasg

+0

@LostInOverflow - 當然,通過它。 – OrangeRind

回答

0

是的,你可以使用類似SpatialSpark。它僅適用於Spark 1.6.1,但您可以使用BroadcastSpatialJoin創建效率極高的RTree

下面是使用SpatialSpark與PySpark我的一個例子,以檢查是否不同多邊形在彼此或相交的:

from ast import literal_eval as make_tuple 
print "Java Spark context version:", sc._jsc.version() 
spatialspark = sc._jvm.spatialspark 

rectangleA = Polygon([(0, 0), (0, 10), (10, 10), (10, 0)]) 
rectangleB = Polygon([(-4, -4), (-4, 4), (4, 4), (4, -4)]) 
rectangleC = Polygon([(7, 7), (7, 8), (8, 8), (8, 7)]) 
pointD = Point((-1, -1)) 

def geomABWithId(): 
    return sc.parallelize([ 
    (0L, rectangleA.wkt), 
    (1L, rectangleB.wkt) 
    ]) 

def geomCWithId(): 
    return sc.parallelize([ 
    (0L, rectangleC.wkt) 
    ]) 

def geomABCWithId(): 
    return sc.parallelize([ 
    (0L, rectangleA.wkt), 
    (1L, rectangleB.wkt), 
    (2L, rectangleC.wkt)]) 

def geomDWithId(): 
    return sc.parallelize([ 
    (0L, pointD.wkt) 
    ]) 

dfAB     = sqlContext.createDataFrame(geomABWithId(), ['id', 'wkt']) 
dfABC    = sqlContext.createDataFrame(geomABCWithId(), ['id', 'wkt']) 
dfC     = sqlContext.createDataFrame(geomCWithId(), ['id', 'wkt']) 
dfD     = sqlContext.createDataFrame(geomDWithId(), ['id', 'wkt']) 

# Supported Operators: Within, WithinD, Contains, Intersects, Overlaps, NearestD 
SpatialOperator  = spatialspark.operator.SpatialOperator 
BroadcastSpatialJoin = spatialspark.join.BroadcastSpatialJoin 

joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0) 

joinRDD.count() 

results = joinRDD.collect() 
map(lambda result: make_tuple(result.toString()), results) 

# [(0, 0), (1, 1), (2, 0)] read as: 
# ID 0 is within 0 
# ID 1 is within 1 
# ID 2 is within 0 

注線

joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0) 

的最後一個參數是一個緩衝器值,在你的情況下,這將是你想要使用的寬容。如果您使用緯度/經度,它可能會是一個非常小的數字,因爲它是一個徑向系統,並且取決於您想要的公差,您需要輸入calculate based on lat/lon for your area of interest