2017-04-22 267 views
2

我有2個數組大小不等的:numpy的陣列比較和索引

>>> np.size(array1) 
4004001 
>>> np.size(array2) 
1000 

現在,在數組2的每個元素需要進行比較,以在ARRAY1的所有元素,以找到具有最接近的值的元素在array2中的這個元素的那個。 找到此值後,我需要將其存儲在大小爲1000的不同數組中 - 其中一個大小對應於array2。

這樣做的單調乏味和粗糙的方式可能是使用for循環,並從數組2中取出每個元素,從數組1中減去其絕對值,然後取最小值 - 這會讓我的代碼非常慢。

我想使用numpy矢量化操作來做到這一點,但我有點碰壁。

+1

首先對兩個數組進行排序。然後遍歷大數組,保持小數組中當前最接近的元素的索引。根據需要增加索引。如果itertools中有些東西會加快速度,我不會感到驚訝。 –

+1

[在numpy數組中找到最接近的值]的可能重複(http://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array) –

回答

1

要充分利用numpy並行性,我們需要矢量化函數。此外,使用相同的標準(最近)在相同的數組(array1)中找到所有值。因此,可以製作一個專門用於在array1中搜索的特殊功能。

但是,爲了使解決方案更具可重用性,最好製作更通用的解決方案,然後將其轉換爲更具體的解決方案。因此,作爲找到最接近的值的一般方法,我們從this find nearest solution開始。然後我們把它轉換成一個更加具體和矢量化它,允許它在一次多個元素上工作:

import math 
import numpy as np 
from functools import partial 

def find_nearest_sorted(array,value): 
    idx = np.searchsorted(array, value, side="left") 
    if idx > 0 and (idx == len(array) or math.fabs(value - array[idx-1]) < math.fabs(value - array[idx])): 
     return array[idx-1] 
    else: 
     return array[idx] 

array1 = np.random.rand(4004001) 
array2 = np.random.rand(1000) 

array1_sorted = np.sort(array1) 

# Partially apply array1 to find function, to turn the general function 
# into a specific, working with array1 only. 
find_nearest_in_array1 = partial(find_nearest_sorted, array1_sorted) 

# Vectorize specific function to allow us to apply it to all elements of 
# array2, the numpy way. 
vectorized_find = np.vectorize(find_nearest_in_array1) 

output = vectorized_find(array2) 

希望這是你想要的,一個新的載體,映射數據array2到最近的值在array1

+0

而且,由於我們要查看'array1'多個次(1000次),首先對數組進行排序,從而節省一次排序成本,以加快隨後的每次查找操作。 – JohanL

+0

謝謝@JohanL和大家的幫助!我以前從未使用過functools。這很棒! – sb25

0
import numpy as np 
a = np.random.random(size=4004001).astype(np.float16) 
b = np.random.random(size=1000).astype(np.float16) 
#use numpy broadcasting to compare pairwise difference and then find the min arg in a for each element in b. Finally extract elements from a using the argmin array as indexes. 
output = a[np.argmin(np.abs(b[:,None] -a),axis=1)] 

這個解決方案雖然簡單,但可能會佔用大量內存。如果在大型陣列上使用它,可能需要進一步優化。

+0

這個解決方案的時間和空間複雜度相當大,因爲它將問題擴展到一個尺寸爲4004001x1000的矩陣,然後它不對array1進行排序,從而使得find('min')操作比需要的慢成爲。 – JohanL

+0

是的,我意識到這一點,並且正在考慮優化它的方法,同時保持其簡單性。 – Allen

+0

請編輯您的答案以包含一些解釋。僅有代碼的答案對未來SO讀者的教育很少。您的回答是在低質量的審覈隊列中。 – mickmackusa

0

最「numpythonic」的方式是使用broadcasting。這是計算距離矩陣的一種快速而簡單的方法,然後您可以獲取絕對值的argmin

形狀的
array1 = np.random.rand(4004001) 
array2 = np.random.rand(1000) 

# Calculate distance matrix (on truncated array1 for memory reasons) 
dmat = array1[:400400] - array2[:,None] 

# Take the abs of the distance matrix and work out the argmin along the last axis 
ix = np.abs(dmat).argmin(axis=1) 

dmat

(1000, 400400) 

的形狀ix和內容:

(1000,)  
array([237473, 166831, 72369, 11663, 22998, 85179, 231702, 322752, ...]) 

然而,它的內存餓了,如果你在一個去做這個手術了,居然不在我的8GB機器上處理您指定的陣列大小,這就是爲什麼我減小了array1的大小的原因。

要使其在內存限制內工作,只需將其中一個數組切片爲塊,然後依次(或平行)在每個塊上應用廣播。在這種情況下,我將array2分爲10個區塊:

# Define number of chunks and calculate chunk size 
n_chunks = 10 
chunk_len = array2.size // n_chunks 

# Preallocate output array 
out = np.zeros(1000) 

for i in range(n_chunks): 
    s = slice(i*chunk_len, (i+1)*chunk_len) 
    out[s] = np.abs(array1 - array2[s, None]).argmin(axis=1) 
+0

你的解決方案仍然相當餓死,即使是大塊頭。它也很慢,因爲它的最小操作是O(n),對於未排序的列表。這就是爲什麼我覺得需要一種更復雜的方法,但時間複雜性大大提高。 – JohanL

+0

但它的工作原理很容易理解。如果速度和內存是無法通過並行解決的OP的重要問題,那麼更復雜的方法是合理的。 – FuzzyDuck