2017-02-26 185 views
1

我正在嘗試啓動可變數量的線程來計算我的自動交易模塊之一的函數結果。我有大約14個函數,所有這些函數在計算上都很昂貴。我一直按順序計算每個函數,但大約需要3分鐘才能完成,而且我的平臺頻率很高,我需要將計算時間縮短到1分鐘或更短。Python的多功能多線程與線程。線程? (可變線程數)

我讀過多處理和多線程,但我找不到適合我需求的解決方案。

我想要做的是定義「n」個線程使用,然後將我的函數列表劃分爲「n」個組,然後在單獨的線程中計算每組函數。基本上:

functionList = [func1,func2,func3,func4] 
outputList = [func1out,func2out,func3out,func4out] 
argsList = [func1args,func2args,func3args,func4args] 

# number of threads 
n = 3 

functionSplit = np.array_split(np.array(functionList),n) 
outputSplit = np.array_split(np.array(outputList),n) 
argSplit = np.array_split(np.array(argsList),n) 

現在我想開始「n」單獨的線程,每個線程根據拆分列表處理功能。然後,我想根據outputList命名每個函數的輸出,併爲每個函數的輸出創建一個主碼。然後我將遍歷輸出字典並根據每列中的信息創建一個帶有列ID號的數據框(已經有了這部分工作,只需要多線程)。

有沒有辦法做到這樣的事情?我一直在研究創建threading.Thread類的子類並將函數,輸出名稱和參數傳遞給run()方法,但我不知道如何命名和輸出每個線程的函數結果!我也不知道如何根據相應的參數在列表中調用函數!

我這樣做的原因是爲了發現計算效率和時間之間最佳的線程數量平衡。就像我說的,這將被整合到我正在開發的高頻交易平臺中,時間是我的主要限制!

任何想法?

+1

如果你的函數是CPU限制的,那麼忘記多線程並查看多處理(至少對於CPython)。 – Bakuriu

+0

我現在意識到多處理是要走的路。任何有關如何使用'threading'中的'process'模塊執行此操作的指針? – denbjornen505

+1

'threading'是錯誤的模塊。使用'multiprocessing'。 'Pool.map'就是你想要的。 – Tyler

回答

1

您可以使用multiprocessing庫像下面

import multiprocessing 

def callfns(fnList, argList, outList, d): 
    for i in range(len(fnList)): 
     d[somekey] = fnList[i](argList, outList) 

... 

manager = multiprocessing.Manager() 
d = manager.dict() 
processes = [] 
for i in range(len(functionSplit)): 
    process = multiprocessing.Process(target=callfns, args=(functionSplit[i], argSplit[i], outputSplit[i], d)) 
    processes.append(process) 

for j in processes: 
    j.start() 

for j in processes: 
    j.join() 

# use d here 

您可以使用一個服務器進程共享這些過程之間的字典。要與服務器進程交互,您需要Manager。然後你可以在服務器進程manager.dict()中創建一個字典。一旦所有進程回到主進程,您可以使用字典d

我希望這可以幫助你解決你的問題。

+0

欲瞭解更多[這裏](https://shreyash14s.wordpress.com/2017/02/19/parallelism-in-python/) –

+0

謝謝!我的每個函數都會輸出一個數據幀,有沒有什麼方法可以將每個函數的輸出轉換爲字典?我會在'callfuns'函數中執行此操作並返回每個段的字典嗎?只是不知道這將如何與多個進程,如果他們在不同的時間完成。你可以編輯你的答案,包括這個? – denbjornen505

+0

@ denbjornen505我已經更新了我的答案,正如你所說的。 –

1
  • 您應該使用multiprocessing而不是線程來執行cpu綁定任務。

  • 手動創建和管理流程可能很困難,需要更多的努力。檢查concurrent.futures並嘗試使用ProcessPool來維護一組進程。您可以向他們提交任務並檢索結果。

  • Pool.map方法從multiprocessing模塊可以採取一個函數和iterable,然後並行處理它們塊並行計算更快。迭代被分解成單獨的塊。這些塊在不同的進程中傳遞給函數。然後將結果放回到一起。