2017-09-18 64 views
1

我在x,y平面有很多點,長度約爲10000,每個點(x,y)的固有半徑爲r。這個小數據集只是我整個數據集的一個小角落。我有一個感興趣的點(x1,y1),我想找到1點附近的(x1,y1)附近的點,並符合(x,y)(x1,y1)之間的距離小於r的標準。我想返回這些優點的索引,而不是優點本身。高效找到切割鄰居並返回索引

import numpy as np 
np.random.seed(2000) 
x = 20.*np.random.rand(10000) 
y = 20.*np.random.rand(10000) 
r = 0.3*np.random.rand(10000) 
x1 = 10. ### (x1,y1) is an interest point 
y1 = 12. 
def index_finder(x,y,r,x1,y1): 
    idx = (abs(x - x1) < 1.) & (abs(y - y1) < 1.) ### This cut will probably cut 90% of the data 
    x_temp = x[idx] ### but if I do like this, then I lose the track of the original index 
    y_temp = y[idx] 
    dis_square = (x_temp - x1)*(x_temp - x1) + (y_temp - y1)*(y_temp - y1) 
    idx1 = dis_square < r*r ### after this cut, there are only a few left 
    x_good = x_temp[idx1] 
    y_good = y_temp[idx1] 

在此功能,我可以找到好點左右(x1,y1),但那些好點的不是指數。但是,我需要ORIGINAL索引,因爲原始索引用於提取與座標(x,y)相關的其他數據。正如我所提到的,樣本數據集只是我整個數據集的一個小角落,我將爲整個數據集調用上述函數大約1,000,000次,因此上述index_finder函數的效率也是一個考慮因素。

對此類任務有何想法?

+0

如何爲所有這些點使用index_finder?你是在循環中使用它還是就像那樣? – Divakar

+0

我將在循環中使用這個函數因爲我有很多這樣的感興趣的點,比如'(x1,y1)'。這個函數本身可以避免任何循環。而這個數據集只有我整個數據集的1/1000。 –

回答

1

方法#1

我們可以簡單的指數與自身的掩模的第一掩模用於從第二階段屏蔽值的真正的地方,像這樣 -

idx[idx] = idx1 

因此,idx將具有對應於原始陣列xy的最終有效掩蔽值/良好值的位置,即 -

x_good = x[idx] 
y_good = y[idx] 

這個掩碼可以用來索引問題中提到的其他數組。


方法2

作爲另一種方法,我們可以使用兩個條件語句,從而創建兩個口罩他們。最後,將它們與AND-ing組合以獲得組合掩碼,其可以被索引到最終輸出的xy陣列中。我們不需要以這種方式獲得實際的指數,所以這是另一個好處。

因此,實施 -

X = x-x1 
Y = y-y1 
mask1 = (np.abs(X) < 1.) & (np.abs(Y) < 1.) 
mask2 = X**2 + Y*2 < r**2 
comb_mask = mask1 & mask2 

x_good = x[comb_mask] 
y_good = y[comb_mask] 

如果由於某種原因,你仍然需要相應的指標,只是做 -

comb_idx = np.flatnonzero(comb_mask) 

如果你在做這些操作針對不同x1y1對對於相同的xy數據集,我建議使用broadcasting向量化它,通過所有那些x1,y1配對數據如this post所示。

+0

感謝您的回答。我想這個實現會有點低效。我也想加快速度,因爲我將有一個大約100萬次的循環來調用這個函數。 –

+0

@HuanianZhang一點點比什麼效率低? – Divakar

+0

我想這會比我的實施效率低一點。因爲它僅計算第二次切割中約10%的數據。但我的實現的缺點是它不能返回索引。 –

0

你可以把你的索引的面具,像這樣:

def index_finder(x,y,r,x1,y1): 
    idx = np.nonzero((abs(x - x1) < 1.) & (abs(y - y1) < 1.)) #numerical, not boolean 
    mask = (x[idx] - x1)*(x[idx] - x1) + (y[idx] - y1)*(y[idx] - y1) < r*r 
    idx1 = [i[mask] for i in idx] 
    x_good = x_temp[idx1] 
    y_good = y_temp[idx1] 

現在idx1是要提取的指標。一般

更快的方式做到這一點是使用scipy.spatial.KDTree

from scipy.spatial import KDTree 

xy = np.stack((x,y)) 
kdt = KDTree(xy) 
kdt.query_ball_point([x1, y1], r) 

如果你有很多點對查詢相同的數據集,這將是比順序調用您的index_finder應用更快。

x1y1 = np.stack((x1, y1)) #`x1` and `y1` are arrays of coordinates. 
kdt.query_ball_point(x1y1, r) 

也是錯誤的:,如果你對每個點的距離不同,你可以這樣做:

def query_variable_ball(kdtree, x, y, r): 
    out = [] 
    for x_, y_, r_ in zip(x, y, r): 
     out.append(kdt.query_ball_point([x_, y_], r_) 
    return out 

xy = np.stack((x,y)) 
kdt = KDTree(xy) 
query_variable_ball(kdt, x1, y1, r) 

編輯2:這應該與不同r值工作,每個點

from scipy.spatial import KDTree 

def index_finder_kd(x, y, r, x1, y1): # all arrays 
    xy = np.stack((x,y), axis = -1) 
    x1y1 = np.stack((x1, y1), axis = -1) 
    xytree = KDTree(xy) 
    d, i = xytree.query(x1y1, k = None, distance_upper_bound = 1.) 
    good_idx = np.zeros(x.size, dtype = bool) 
    for idx, dist in zip(i, d): 
     good_idx[idx] |= r[idx] > dist 
    x_good = x[good_idx] 
    y_good = y[good_idx] 
    return x_good, y_good, np.flatnonzero(good_idx) 

這是只有一個(x1, y1)對慢,因爲KDTree需要一段時間才能填充。但是如果你有幾百萬雙,這將會更快。

(我假設你想在(x, y)數據都好點的工會所有(x1, y1),如果你想讓他們分開它也可以採用類似的方法,消除基於對i[j]元素是否d[j] < r[i[j]]

+0

是不是'index_finder#2'與我在開始的帖子中建議的相同? – Divakar

+0

是的。沒有注意到,因爲我直接跳到了方法2。 –

+0

如果聽起來不太冒犯,你會刪除那部分?兩篇內容相同的帖子看起來不太好:) – Divakar

1

numpy.where似乎查找索引

做出

向量化的標準計算+ np.where()可能比一個循環

sq_norm = (x - x1)**2 + (y - y1)**2 # no need to take 10000 sqrt 
idcs = np.where(sq_norm < 1.) 

len(idcs[0]) 
Out[193]: 69 

np.stack((idcs[0], x[idcs], y[idcs]), axis=1)[:5] 
Out[194]: 
array([[ 38.  , 9.47165956, 11.94250173], 
     [ 39.  , 9.6966941 , 11.67505453], 
     [ 276.  , 10.68835317, 12.11589316], 
     [ 288.  , 9.93632584, 11.07624915], 
     [ 344.  , 9.48644057, 12.04911857]]) 

規範calc也可以包括r數組,第二步?

r_sq_norm = (x[idcs] - x1)**2 + (y[idcs] - y1)**2 - r[idcs]**2 
r_idcs = np.where(r_sq_norm < 0.) 

idcs[0][r_idcs] 
Out[11]: array([1575, 3476, 3709], dtype=int64) 

你可能要時間2步測試VS包括在第一量化標準計算r

sq_norm = (x - x1)**2 + (y - y1)**2 - r**2 
idcs = np.where(sq_norm < 0.) 

idcs[0] 
Out[13]: array([1575, 3476, 3709], dtype=int64)