我原來的問題是關於Python下的並行性問題。然而,由於問題仍然沒有答案,我刪除了它,我試圖總結我的結論。希望這會幫助別人......Python中的多處理比單線程慢
一般有使你的代碼運行並行方式主要有兩種 - 要麼通過使用多線程或多處理庫。
根據許多職位上stackoverflow.com多線程庫能夠在線程之間有效地共享內存,但運行在單個核心的線程。因此,如果瓶頸是I/O操作,它可以加速您的代碼。我不知道是否有對圖書館的許多現實生活中的應用...
如果你的代碼是CPU密集型(有時也稱爲CPU爲界),多處理庫可以回答你的問題。該庫將線程分散到各個核心。然而,很多人(包括我)都觀察到,這樣的多核代碼可能會顯着減慢它的單數對應。這個問題是由於個別線程無法有效共享內存這一事實造成的 - 數據被廣泛複製,這造成了相當的開銷。如下面的代碼所示,開銷非常依賴於輸入數據類型。 Windows上的問題比Linux上的更爲深刻。我不得不說,並行是我最大的Python失望 - 顯然Python並非設計時考慮了並行...
第一段代碼在使用Process
的核心之間分配pandas dataframe
。
import numpy as np
import math as mth
import pandas as pd
import time as tm
import multiprocessing as mp
def bnd_calc_npv_dummy(bnds_info, core_idx, npv):
""" multiple core dummy valuation function (based on single core function) """
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv[core_idx] = np.array(bnds_info['npv'])
def split_bnds_info(bnds_info, cores_no):
""" cut dataframe with bond definitions into pieces - one piece per core """
bnds_info_mp = []
bnds_no = len(bnds_info)
batch_size = mth.ceil(np.float64(bnds_no)/cores_no) # number of bonds allocated to one core
# split dataframe among cores
for idx in range(cores_no):
lower_bound = int(idx * batch_size)
upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no]))
bnds_info_mp.append(bnds_info[lower_bound : upper_bound].reset_index().copy())
# return list of dataframes
return bnds_info_mp
def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """
manager = mp.Manager()
npv = manager.dict()
bnds_info_mp = split_bnds_info(bnds_info, cores_no)
processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]
[process.start() for process in processes]
[process.join() for process in processes]
# return NPV of individual bonds
return np.hstack(npv.values())
if __name__ == '__main__':
# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = {'currency_name' : 'EUR', 'npv' : 100}
bnds_info = pd.DataFrame(bnds_info, index = range(1))
bnds_info = pd.concat([bnds_info] * bnds_no, ignore_index = True)
# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv = np.array(bnds_info['npv'])
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
第二個代碼是和以前一樣的一個 - 唯一不同的是,這次我們使用的numpy array
代替pandas dataframe
和性能差異是巨大的(對比單核與運行時更改運行時間變化多核)。
import numpy as np
import math as mth
import time as tm
import multiprocessing as mp
def bnd_calc_npv_dummy(bnds_info, core_idx, npv):
""" multiple core dummy valuation function (based on single core function) """
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv[core_idx] = bnds_info
def split_bnds_info(bnds_info, cores_no):
""" cut dataframe with bond definitions into pieces - one piece per core """
bnds_info_mp = []
bnds_no = len(bnds_info)
batch_size = mth.ceil(np.float64(bnds_no)/cores_no) # number of bonds allocated to one core
# split dataframe among cores
for idx in range(cores_no):
lower_bound = int(idx * batch_size)
upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no]))
bnds_info_mp.append(bnds_info[lower_bound : upper_bound])
# return list of dataframes
return bnds_info_mp
def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """
manager = mp.Manager()
npv = manager.dict()
bnds_info_mp = split_bnds_info(bnds_info, cores_no)
processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]
[process.start() for process in processes]
[process.join() for process in processes]
# return NPV of individual bonds
return np.hstack(npv.values())
if __name__ == '__main__':
# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = np.array([100] * bnds_no)
# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
代碼的最後一塊使用Pool
代替Process
。運行時間稍好一些。
import numpy as np
import time as tm
import multiprocessing as mp
#import pdb
#pdb.set_trace()
def bnd_calc_npv_dummy(bnds_info):
""" multiple core dummy valuation function (based on single core function) """
try:
# get number of bonds
bnds_no = len(bnds_info)
except:
pass
bnds_no = 1
tm.sleep(0.0001 * bnds_no)
return bnds_info
def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """
pool = mp.Pool(processes = cores_no)
npv = pool.map(bnd_calc_npv_dummy, bnds_info.tolist())
# return NPV of individual bonds
return npv
if __name__ == '__main__':
# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = np.array([100.0] * bnds_no)
# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
所以,我的結論是Python的並行性實現並不適用於現實生活中(我使用Python 2.7.13和Window 7)。 最好的問候,
麥基
PS:如果有人能夠改變我的代碼將超過高興地改變我的想法......
根本不讀它,多處理完成錯誤比單個進程慢。考慮創建一個[最小,完整和可驗證示例](https://stackoverflow.com/help/mcve)。 –
對於I/O綁定的任務(例如從文件讀取或寫入文件),您應該考慮使用'threading'模塊而不是'multiprocessing'模塊。多處理對CPU綁定任務更有效。 – ettanany