2016-06-21 120 views
1

我想在Python中使用多處理來加速while循環。循環條件並行化/多處理

更具體地說:
我有一個矩陣(樣本*功能)。我想選擇x樣本的子集,其特徵的隨機子集的值不等於某個值(本例中爲-1)。

我的串行代碼:

np.random.seed(43) 
datafile = '...' 
df = pd.read_csv(datafile, sep=" ", nrows = 89) 

no_feat = 500 
no_samp = 5 
no_trees = 5 
i=0 
iter=0 


samples = np.zeros((no_trees, no_samp)) 
features = np.zeros((no_trees, no_feat)) 

while i < no_trees: 
    rand_feat = np.random.choice(df.shape[1], no_feat, replace=False) 
    iter_order = np.random.choice(df.shape[0], df.shape[0], replace=False) 

    samp_idx = [] 
    a=0 

#-------------- 
    #how to run in parallel? 

    for j in iter_order: 
     pot_samp = df.iloc[j, rand_feat] 
     if len(np.where(pot_samp==-1)[0]) == 0: 
      samp_idx.append(j) 
     if len(samp_idx) == no_samp: 
      print a 
      break 
     a+=1 

#-------------- 

    if len(samp_idx) == no_samp: 
     samples[i,:] = samp_idx 
     features[i, :] = rand_feat 
     i+=1 
    iter+=1 
    if iter>1000: #break if subsets cannot be found 
     break 

搜索擬合樣品是潛在地昂貴的部分(第j for循環),這在理論上可以並行運行。在某些情況下,不需要遍歷所有樣本以找到足夠大的子集,這就是爲什麼一旦子集足夠大,我就會跳出循環。
我很努力地找到一個實現,可以檢查已經生成了多少有效結果。它甚至有可能嗎?

我以前用過joblib。如果我理解正確,這會使用多處理方法作爲僅適用於單獨任務的後端?我在想,queues可能會有所幫助,但迄今爲止我未能實施它們。

+0

使用'joblib'或'multiprocessing.pool'是有道理的。我會爲每個核心運行一個進程,並創建一個共享計數器,由'Lock'保護或者實現爲一個原子整數,將其遞增直至達到特定計數(考慮到重複),然後所有進程都將完成,返回他們的結果。 (你可以使用'apply_async()')。 – advance512

+1

@ advance512謝謝你給我這些方法來看看。 – Dahlai

回答

0

我找到了一個工作解決方案。我決定並行運行while循環,並讓不同的進程通過共享計數器進行交互。此外,我矢量化搜索合適的樣本。

矢量化產生了〜300倍的加速,並在4個內核上運行,可以使計算速度提高兩倍。

首先,我嘗試實施單獨的流程並將結果放入queue。原來,這些不是用來存儲大量數據的。

如果有人看到另一個代碼瓶頸,我會很高興,如果有人指出。

由於我對平行計算基本不存在的知識,我發現很難將它們拼湊在一起,特別是因爲互聯網上的例子都非常基本。我學到了很多,雖然=)

我的代碼:

import numpy as np 
import pandas as pd 
import itertools 
from multiprocessing import Pool, Lock, Value 
from datetime import datetime 
import settings 


val = Value('i', 0) 
worker_ID = Value('i', 1) 
lock = Lock() 

def findSamp(no_trees, df, no_feat, no_samp): 
    lock.acquire() 
    print 'starting worker - {0}'.format(worker_ID.value) 
    worker_ID.value +=1 
    worker_ID_local = worker_ID.value 
    lock.release() 

    max_iter = 100000 
    samp = [] 
    feat = [] 
    iter_outer = 0 
    iter = 0 
    while val.value < no_trees and iter_outer<max_iter: 
     rand_feat = np.random.choice(df.shape[1], no_feat, replace=False 

     #get samples with random features from dataset; 
     #find and select samples that don't have missing values in the random features 
     samp_rand = df.iloc[:,rand_feat] 
     nan_idx = np.unique(np.where(samp_rand == -1)[0]) 
     all_idx = np.arange(df.shape[0]) 
     notnan_bool = np.invert(np.in1d(all_idx, nan_idx)) 
     notnan_idx = np.where(notnan_bool == True)[0] 

     if notnan_idx.shape[0] >= no_samp: 
      #if enough samples for random feature subset, select no_samp samples randomly 
      notnan_idx_rand = np.random.choice(notnan_idx, no_samp, replace=False) 
      rand_feat_rand = rand_feat 

      lock.acquire() 
      val.value += 1 
      #x = val.value 
      lock.release() 
      #print 'no of trees generated: {0}'.format(x) 
      samp.append(notnan_idx_rand) 
      feat.append(rand_feat_rand) 

     else: 
      #increase iter_outer counter if no sample subset could be found for random feature subset 
      iter_outer += 1 

     iter+=1 
    if iter >= max_iter: 
     print 'exiting worker{0} because iter >= max_iter'.format(worker_ID_local) 
    else: 
     print 'worker{0} - finished'.format(worker_ID_local) 
    return samp, feat 

def initialize(*args): 
    global val, worker_ID, lock 
    val, worker_ID, lock = args 

def star_findSamp(i_df_no_feat_no_samp): 
    return findSamp(*i_df_no_feat_no_samp) 


if __name__ == '__main__': 
    np.random.seed(43) 
    datafile = '...' 
    df = pd.read_csv(datafile, sep=" ", nrows = 89) 
    df = df.fillna(-1) 
    df = df.iloc[:, 6:] 

    no_feat = 700 
    no_samp = 10 
    no_trees = 5000 


    startTime = datetime.now() 
    print 'starting multiprocessing' 
    ncores = 4 
    p = Pool(ncores, initializer=initialize, initargs=(val, worker_ID, lock)) 
    args = itertools.izip([no_trees]*ncores, itertools.repeat(df), itertools.repeat(no_feat), itertools.repeat(no_samp)) 

    result = p.map(star_findSamp, args)#, callback=log_result) 
    p.close() 
    p.join() 

    print '{0} sample subsets for tree training have been found'.format(val.value) 

    samples = [x[0] for x in result if x != None] 
    samples = np.vstack(samples) 
    features = [x[1] for x in result if x != None] 
    features = np.vstack(features) 
    print datetime.now() - startTime