2012-11-21 49 views
10

我有一個3D陣列,我需要插入一個軸(最後一維)。比方說y.shape = (nx, ny, nz),我想每(nx, ny)插入nz。不過,我想在每個[i, j]內插一個不同的值。三維陣列快速插值

這裏有一些代碼來舉例說明。如果我想,內插一個值,說new_z,我會用scipy.interpolate.interp1d這樣

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a number 
f = scipy.interpolate.interp1d(x, y, axis=-1, kind='linear') 
result = f(new_z) 

然而,對於這個問題是什麼其實我想要的是插值到不同new_z每個y[i, j]。所以我這樣做:

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a 2D array 
result = numpy.empty(y.shape[:-1]) 
for i in range(nx): 
    for j in range(ny): 
     f = scipy.interpolate.interp1d(x, y[i, j], axis=-1, kind='linear') 
     result[i, j] = f(new_z[i, j]) 

不幸的是,由於多個循環,這變得低效率和慢。有沒有更好的方法來做這種插值?線性插值就足夠了。一種可能性是在Cython中實現這一點,但我試圖避免這種情況,因爲我想要更改爲三次插值的靈活性,並且不想在Cython中手動完成。

回答

6

要加速高階內插,您只能撥打interp1d()一次,然後在_fitpack模塊中使用_spline屬性和低級功能_bspleval()。下面是代碼:

from scipy.interpolate import interp1d 
import numpy as np 

nx, ny, nz = 30, 40, 50 
x = np.arange(0, nz, 1.0) 
y = np.random.randn(nx, ny, nz) 
new_x = np.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0 

def original_interpolation(x, y, new_x): 
    result = np.empty(y.shape[:-1]) 
    for i in xrange(nx): 
     for j in xrange(ny): 
      f = interp1d(x, y[i, j], axis=-1, kind=3) 
      result[i, j] = f(new_x[i, j]) 
    return result 

def fast_interpolation(x, y, new_x): 
    from scipy.interpolate._fitpack import _bspleval 
    f = interp1d(x, y, axis=-1, kind=3) 
    xj,cvals,k = f._spline 
    result = np.empty_like(new_x) 
    for (i, j), value in np.ndenumerate(new_x): 
     result[i, j] = _bspleval(value, x, cvals[:, i, j], k, 0) 
    return result 

r1 = original_interpolation(x, y, new_x) 
r2 = fast_interpolation(x, y, new_x) 

>>> np.allclose(r1, r2) 
True 

%timeit original_interpolation(x, y, new_x) 
%timeit fast_interpolation(x, y, new_x) 
1 loops, best of 3: 3.78 s per loop 
100 loops, best of 3: 15.4 ms per loop 
+0

謝謝。你的解決方案也很有趣。我對這麼多好的答案感到驚訝。不幸的是,我只能接受一個。儘管您的解決方案沒有加速Cython或@pv的解決方案,但它更適合構建問題。而且在插值方面最爲靈活。所以我接受它。 – tiago

+0

我想運行這段代碼,但我得到這個錯誤'BSpline'對象不可迭代 – Delosari

3

我不認爲interp1d有一個快速的方法,所以你不能避免這裏的循環。

用Cython你可能仍然編碼了使用np.searchsorted,像這樣(未測試)的線性插值避免:

def interp3d(x, y, new_x): 
    assert x.ndim == 1 and y.ndim == 3 and new_x.ndim == 2 
    assert y.shape[:2] == new_x.shape and x.shape == y.shape[2:] 

    nx, ny = y.shape[:2] 
    new_x = new_x.ravel() 
    j = np.arange(len(new_x)) 
    k = np.searchsorted(x, new_x).clip(1, len(x) - 1) 
    y = y.reshape(-1, x.shape[0]) 
    p = (new_x - x[k-1])/(x[k] - x[k-1]) 
    result = (1 - p) * y[j,k-1] + p * y[j,k] 
    return result.reshape(nx, ny) 

不立方插值幫助,雖然。

編輯:使其成爲函數並修正了錯誤的一個錯誤。一些與Cython比較(500x500x500網格):

In [58]: %timeit interp3d(x, y, new_x) 
10 loops, best of 3: 82.7 ms per loop 

In [59]: %timeit cyfile.interp3d(x, y, new_x) 
10 loops, best of 3: 86.3 ms per loop 

In [60]: abs(interp3d(x, y, new_x) - cyfile.interp3d(x, y, new_x)).max() 
Out[60]: 2.2204460492503131e-16 

雖然,可以爭辯說,Cython代碼更容易閱讀。

+0

謝謝,它肯定是與numpy做它的一個優雅的方式。我結束了一個快速的Cython解決方案(請參閱我的答案)。編寫Cython比等待python版本完成運行速度要快。 – tiago

+0

@pv。你可以通過[作爲矢量操作執行插值]來避免循環(http://stackoverflow.com/a/13495570/709852) –

+0

@HenryGomersall:是的(我在上面的代碼中試過),但我的意思是這是不可能的,如果你想堅持interp1d。 –

3

由於上面的numpy建議時間太長,我可以等待,這裏是cython版本,以備將來參考。從一些鬆散的基準測試中,它的速度大約快了3000倍(當然,這只是線性插值,並不如interp1d,但對於這個目的來說沒關係)。

import numpy as N 
cimport numpy as N 
cimport cython 

DTYPEf = N.float64 
ctypedef N.float64_t DTYPEf_t 

@cython.boundscheck(False) # turn of bounds-checking for entire function 
@cython.wraparound(False) # turn of bounds-checking for entire function 
cpdef interp3d(N.ndarray[DTYPEf_t, ndim=1] x, N.ndarray[DTYPEf_t, ndim=3] y, 
       N.ndarray[DTYPEf_t, ndim=2] new_x): 
    """ 
    interp3d(x, y, new_x) 

    Performs linear interpolation over the last dimension of a 3D array, 
    according to new values from a 2D array new_x. Thus, interpolate 
    y[i, j, :] for new_x[i, j]. 

    Parameters 
    ---------- 
    x : 1-D ndarray (double type) 
     Array containg the x (abcissa) values. Must be monotonically 
     increasing. 
    y : 3-D ndarray (double type) 
     Array containing the y values to interpolate. 
    x_new: 2-D ndarray (double type) 
     Array with new abcissas to interpolate. 

    Returns 
    ------- 
    new_y : 3-D ndarray 
     Interpolated values. 
    """ 
    cdef int nx = y.shape[0] 
    cdef int ny = y.shape[1] 
    cdef int nz = y.shape[2] 
    cdef int i, j, k 
    cdef N.ndarray[DTYPEf_t, ndim=2] new_y = N.zeros((nx, ny), dtype=DTYPEf) 

    for i in range(nx): 
     for j in range(ny): 
      for k in range(1, nz): 
       if x[k] > new_x[i, j]: 
        new_y[i, j] = (y[i, j, k] - y[i, j, k - 1]) * \ 
        (new_x[i, j] - x[k-1])/(x[k] - x[k - 1]) + y[i, j, k - 1] 
        break 
    return new_y 
1

你可以使用map_coordinates爲:

from numpy import random, meshgrid, arange 
from scipy.ndimage import map_coordinates 

(nx, ny, nz) = (4, 5, 6) 
# some random array 
A = random.rand(nx, ny, nz) 

# random floating-point indices in [0, nz-1] 
Z = random.rand(nx, ny)*(nz-1) 

# regular integer indices of shape (nx,ny) 
X, Y = meshgrid(arange(nx), arange(ny), indexing='ij') 

coords = (X, Y, Z) # X, Y, and Z are of shape (nx, ny) 

print map_coordinates(A, coords, order=1, cval=-999.) 
+0

我曾想過'map_coordinates',感謝您的建議。在我的情況下,'nx,ny,nz'每個接近500,所以我認爲'map_coordinates'可能會對RAM有點貪婪。我將運行一些基準和報告。 – tiago

2

大廈@pv.'s answer,並且向量化內部循環,下面給出大幅加速(編輯:改變了昂貴numpy.tile使用numpy.lib.stride_tricks.as_strided):

import numpy 
from scipy import interpolate 

nx = 30 
ny = 40 
nz = 50 

y = numpy.random.randn(nx, ny, nz) 
x = numpy.float64(numpy.arange(0, nz)) 

# We select some locations in the range [0.1, nz-0.1] 
new_z = numpy.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0 

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a 2D array 

def original_interpolation(): 
    result = numpy.empty(y.shape[:-1]) 
    for i in range(nx): 
     for j in range(ny): 
      f = interpolate.interp1d(x, y[i, j], axis=-1, kind='linear') 
      result[i, j] = f(new_z[i, j]) 

    return result 

grid_x, grid_y = numpy.mgrid[0:nx, 0:ny] 
def faster_interpolation(): 
    flat_new_z = new_z.ravel() 
    k = numpy.searchsorted(x, flat_new_z) 
    k = k.reshape(nx, ny) 

    lower_index = [grid_x, grid_y, k-1] 
    upper_index = [grid_x, grid_y, k] 

    tiled_x = numpy.lib.stride_tricks.as_strided(x, shape=(nx, ny, nz), 
     strides=(0, 0, x.itemsize)) 

    z_upper = tiled_x[upper_index] 
    z_lower = tiled_x[lower_index] 

    z_step = z_upper - z_lower 
    z_delta = new_z - z_lower 

    y_lower = y[lower_index] 
    result = y_lower + z_delta * (y[upper_index] - y_lower)/z_step 

    return result 

# both should be the same (giving a small difference) 
print numpy.max(
     numpy.abs(original_interpolation() - faster_interpolation())) 

在我的機器上給出以下時間:

In [8]: timeit foo.original_interpolation() 
10 loops, best of 3: 102 ms per loop 

In [9]: timeit foo.faster_interpolation() 
1000 loops, best of 3: 564 us per loop 

nx = 300ny = 300nz = 500,給人以130X加速:

In [2]: timeit original_interpolation() 
1 loops, best of 3: 8.27 s per loop 

In [3]: timeit faster_interpolation() 
10 loops, best of 3: 60.1 ms per loop 

你需要一個寫自己的算法立方插值,但它不應該是這麼難。

2

雖然有幾個不錯的答案, 他們還在做在一個固定的500多頭排列250K插值:

j250k = np.searchsorted(X500, X250k) # indices in [0, 500) 

這可以用LUT有待加快,查找表有說5K插槽:

lut = np.interp(np.arange(5000), X500, np.arange(500)).round().astype(int) 
xscale = (X - X.min()) * (5000 - 1) \ 
     /(X.max() - X.min()) 
j = lut.take(xscale.astype(int), mode="clip") # take(floats) in numpy 1.7 ? 

#--------------------------------------------------------------------------- 
# X  | |  | |    | 
# j  0 1  2 3    4 ... 
# LUT |....|.......|.|.............|.... -> int j (+ offset in [0, 1)) 
#--------------------------------------------------------------------------- 

searchsorted是蠻快的,時間〜LN2 500, 所以這可能是快不了多少。
但LUTs很快在C,非常一個簡單的速度/內存摺衷。