首先要說的是:如果它是關於同一個處理器上的多個核心,numpy
已經能夠並行運行比我們所能用手工做的(見multiplication of large arrays in python討論)
在這案件的關鍵是簡單地確保乘法都在批發陣列操作來完成,而不是一個Python for
-loop:
test2 = x[n.newaxis, :] * y[:, n.newaxis]
n.abs(test - test2).max() # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations
[如果你真的想在多個單獨的CPU上傳播這個,這是一個不同問題,但問題似乎建議單(多核心)CPU]
OK,軸承上面記:讓我們假設你想要並行操作的不僅僅是mult()
更加複雜。假設您已經盡力將您的操作優化爲批量數組操作,而這些批量操作可以並行處理,但是您的操作不會受此影響。在這種情況下,您可以使用使用lock=False
和multiprocessing.Pool
創建的共享內存multiprocessing.Array
來分配進程以處理非重疊塊,並將其劃分爲y
維度(如果需要,還可以同時在x
之上)。下面提供了一個示例列表。請注意,這種方法並沒有明確地完成您指定的內容(將結果放在一起並將它們追加到單個數組中)。相反,它可以提高效率:多個進程在共享內存的非重疊部分同時組合他們的答案部分。完成後,不需要整理/附加:我們只是讀出結果。
import os, numpy, multiprocessing, itertools
SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see http://stackoverflow.com/questions/1675766/
def operate(slices):
# grok the inputs
yslice, xslice = slices
y, x, r = get_shared_arrays('y', 'x', 'r')
# create views of the appropriate chunks/slices of the arrays:
y = y[yslice]
x = x[xslice]
r = r[yslice, xslice]
# do the actual business
for i in range(len(r)):
r[i] = y[i] * x # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself.
# But let's assume this is a placeholder for something more complicated.
return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size)
def check(y, x, r):
r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis] # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary)
print('max. abs. diff. = %g' % numpy.abs(r - r2).max())
return y, x, r
def slicestr(s):
return ':'.join('' if x is None else str(x) for x in [s.start, s.stop, s.step])
def m2n(buf, shape, typecode, ismatrix=False):
"""
Return a numpy.array VIEW of a multiprocessing.Array given a
handle to the array, the shape, the data typecode, and a boolean
flag indicating whether the result should be cast as a matrix.
"""
a = numpy.frombuffer(buf, dtype=typecode).reshape(shape)
if ismatrix: a = numpy.asmatrix(a)
return a
def n2m(a):
"""
Return a multiprocessing.Array COPY of a numpy.array, together
with shape, typecode and matrix flag.
"""
if not isinstance(a, numpy.ndarray): a = numpy.array(a)
return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix)
def new_shared_array(shape, typecode='d', ismatrix=False):
"""
Allocate a new shared array and return all the details required
to reinterpret it as a numpy array or matrix (same order of
output arguments as n2m)
"""
typecode = numpy.dtype(typecode).char
return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix
def get_shared_arrays(*names):
return [m2n(*SHARED_VARS[name]) for name in names]
def init(*pargs, **kwargs):
SHARED_VARS.update(pargs, **kwargs)
if __name__ == '__main__':
ylen = 1000
xlen = 2000
init(y=n2m(range(ylen)))
init(x=n2m(numpy.random.rand(xlen)))
init(r=new_shared_array([ylen, xlen], float))
print('Master process ID is %s' % os.getpid())
#print(operate([slice(None), slice(None)])); check(*get_shared_arrays('y', 'x', 'r')) # local test
pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items())
yslices = [slice(0,333), slice(333,666), slice(666,None)]
xslices = [slice(0,1000), slice(1000,None)]
#xslices = [slice(None)] # uncomment this if you only want to divide things up in the y dimension
reports = pool.map(operate, itertools.product(yslices, xslices))
print('\n'.join(reports))
y, x, r = check(*get_shared_arrays('y', 'x', 'r'))
[mcve]怎麼樣? – boardrider
你永遠不會編程在Python比numpy的內部廣播機制更快的東西,哪怕是多線程/進程......讓numpy的做內部 – Aaron
要小心,不要使用多線程/進程爲它着想。對大量數據進行少量工作只會導致CPU受內存總線速度的影響(與CPU的高速緩存相比,速度較慢)。所以如果你的算法是I/O約束的,添加更多的線程不會導致速度的提高。 – bazza