2014-07-04 117 views
1

我有均勻分佈的數據系列。我希望利用分佈來並行排序數據。對於N個CPU,我基本上定義了N個存儲桶並對這些存儲桶進行了並行排序。我的問題是,我沒有加快速度。「桶排序」與蟒蛇多處理

出了什麼問題?

from multiprocessing import Process, Queue 
from numpy import array, linspace, arange, where, cumsum, zeros 
from numpy.random import rand 
from time import time 


def my_sort(x,y): 
    y.put(x.get().argsort()) 

def my_par_sort(X,np): 
p_list=[] 
Xq = Queue() 
Yq = Queue() 
bmin = linspace(X.min(),X.max(),np+1) #bucket lower bounds 
bmax = array(bmin); bmax[-1] = X.max()+1 #bucket upper bounds 
B = [] 
Bsz = [0] 
for i in range(np): 
    b = array([bmin[i] <= X, X < bmax[i+1]]).all(0) 
    B.append(where(b)[0]) 
    Bsz.append(len(B[-1])) 
    Xq.put(X[b]) 
    p = Process(target=my_sort, args=(Xq,Yq)) 
    p.start() 
    p_list.append(p) 

Bsz = cumsum(Bsz).tolist() 
Y = zeros(len(X)) 
for i in range(np): 
    Y[arange(Bsz[i],Bsz[i+1])] = B[i][Yq.get()] 
    p_list[i].join() 

return Y 


if __name__ == '__main__': 
num_el = 1e7 
mydata = rand(num_el) 
np = 4 #multiprocessing.cpu_count() 
starttime = time() 
I = my_par_sort(mydata,np) 
print "Sorting %0.0e keys took %0.1fs using %0.0f processes" % (len(mydata),time()-starttime,np) 
starttime = time() 
I2 = mydata.argsort() 
print "in serial it takes %0.1fs" % (time()-starttime) 
print (I==I2).all() 

回答

0

在我看來,有兩個主要的問題。

  1. 多個進程的開銷,他們

    產卵幾個Python解釋之間通信造成一些開銷,但主要是將數據傳遞,並從「工人」的過程被查殺性能。你穿過Queue數據需要被「醃」和「拆封」,這是更大的數據有點慢(和你需要做的這兩次)。

    你並不需要使用Queue■如果你使用的線程,而不是過程。使用CPython中的線程處理CPU繁重任務通常被認爲效率低下,因爲通常您會遇到Global Interpreter Lock,但並非總是如此!幸運的是Numpy的排序功能似乎釋放了GIL,所以使用線程是一個可行的選擇!

  2. 的分區和數據集

    分區的接合以及接合所述數據是這樣的「桶排序的方法」的一個必然的成本,但可以通過更有效地做有所緩解。在代碼particar這兩行

    b = array([bmin[i] <= X, X < bmax[i+1]]).all(0) 
    
    Y[arange(Bsz[i],Bsz[i+1])] = ... 
    

    可以rewriten到

    b = (bmin[i] <= X) & (X < bmax[i+1]) 
    
    Y[Bsz[i] : Bsz[i+1]] = ... 
    

    提高些我也發現np.take比「神奇索引」和np.partition也是有用的更快。

彙總,最快我可以使它如下(但它仍然沒有線性內核的數量規模像你想..):

from threading import Thread 

def par_argsort(X, nproc): 
    N = len(X) 
    k = range(0, N+1, N//nproc) 
    I = X.argpartition(k[1:-1]) 
    P = X.take(I) 

    def worker(i): 
     s = slice(k[i], k[i+1]) 
     I[s].take(P[s].argsort(), out=I[s]) 

    t_list = [] 
    for i in range(nproc): 
     t = Thread(target=worker, args=(i,)) 
     t.start() 
     t_list.append(t) 

    for t in t_list: 
     t.join() 

    return I 
2

看起來像你的問題是當你將原始數組分解成幾部分時,你添加的開銷。我把你的代碼,只是刪除的multiprocessing所有使用:

def my_sort(x,y): 
    pass 
    #y.put(x.get().argsort()) 

def my_par_sort(X,np, starttime): 
    p_list=[] 
    Xq = Queue() 
    Yq = Queue() 
    bmin = linspace(X.min(),X.max(),np+1) #bucket lower bounds 
    bmax = array(bmin); bmax[-1] = X.max()+1 #bucket upper bounds 
    B = [] 
    Bsz = [0] 
    for i in range(np): 
     b = array([bmin[i] <= X, X < bmax[i+1]]).all(0) 
     B.append(where(b)[0]) 
     Bsz.append(len(B[-1])) 
     Xq.put(X[b]) 
     p = Process(target=my_sort, args=(Xq,Yq, i)) 
     p.start() 
     p_list.append(p) 
    return 

if __name__ == '__main__': 
    num_el = 1e7 
    mydata = rand(num_el) 
    np = 4 #multiprocessing.cpu_count() 
    starttime = time() 
    I = my_par_sort(mydata,np, starttime) 
    print "Sorting %0.0e keys took %0.1fs using %0.0f processes" % (len(mydata),time()-starttime,np) 
    starttime = time() 
    I2 = mydata.argsort() 
    print "in serial it takes %0.1fs" % (time()-starttime) 
    #print (I==I2).all() 

完全沒有排序發生,multiprocessing代碼只需只要串行代碼:

Sorting 1e+07 keys took 2.2s using 4 processes 
in serial it takes 2.2s 

你可能會想,啓動進程和在它們之間傳遞值的開銷是造成開銷的原因,但是如果刪除multiprocessing的所有用法(包括Xq.put(X[b])調用),結果只會稍快一點:

Sorting 1e+07 keys took 1.9s using 4 processes 
in serial it takes 2.2s 

因此,看來你需要調查破壞你的陣列成片的更有效的方式。