2015-07-22 96 views
1

我有一個交易者交易數據集df。 我有環路2級如下:Python:如何在python中運行嵌套並行進程?

smartTrader =[] 

for asset in range(len(Assets)): 
    df = df[df['Assets'] == asset] 
    # I have some more calculations here 
    for trader in range(len(df['TraderID'])): 
     # I have some calculations here, If trader is successful, I add his ID 
     # to the list as follows 
     smartTrader.append(df['TraderID'][trader]) 

    # some more calculations here which are related to the first for loop. 

我想parallelise每項資產的計算中Assets,我也想parallelise計算每個交易者的所有資產。所有這些計算完成後,我想根據smartTrader列表進行附加分析。

這是我第一次嘗試並行處理,請耐心等待,謝謝你的幫助。

+2

試試['multiprocessing.Pool'(HTTPS:/ /docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers)。 – refi64

+0

我不確定如何在嵌套for循環時調用此函數,請問您可以提供一個小例子嗎? – roland

回答

0

而不是使用for的,使用map

import functools 
smartTrader =[] 

m=map(calculations_as_a_function, 
     [df[df['Assets'] == asset] \ 
       for asset in range(len(Assets))]) 
functools.reduce(smartTradder.append, m) 

從此,你可以嘗試不同的平行map實現股份公司multiprocessing'sstackless'

1

如果您使用pathos,它提供了multiprocessing的分支,您可以輕鬆地嵌套平行地圖。 pathos用於輕鬆測試嵌套平行映射的組合 - 這是嵌套for循環的直接翻譯。 它提供了一系列阻塞,非阻塞,迭代,異步,串行,並行和分佈式映射。

>>> from pathos.pools import ProcessPool, ThreadPool 
>>> amap = ProcessPool().amap 
>>> tmap = ThreadPool().map 
>>> from math import sin, cos 
>>> print amap(tmap, [sin,cos], [range(10),range(10)]).get() 
[[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385, -0.27941549819892586, 0.6569865987187891, 0.9893582466233818, 0.4121184852417566], [1.0, 0.5403023058681398, -0.4161468365471424, -0.9899924966004454, -0.6536436208636119, 0.2836621854632263, 0.9601702866503661, 0.7539022543433046, -0.14550003380861354, -0.9111302618846769]] 

這裏這個例子使用一個處理池和一個線程池,其中,所述螺紋地圖呼叫阻塞,而處理圖呼叫是異步的(注意get在最後一行的末尾)。

獲取pathos這裏:https://github.com/uqfoundation 或: $ pip install git+https://github.com/uqfoundation/[email protected]

0

大概線程,從標準Python庫,是最方便的方法:

import threading 

def worker(id): 
    #Do you calculations here 
    return 

threads = [] 
for asset in range(len(Assets)): 
    df = df[df['Assets'] == asset] 
    for trader in range(len(df['TraderID'])): 
     t = threading.Thread(target=worker, args=(trader,)) 
     threads.append(t) 
     t.start() 
    #add semaphore here if you need synchronize results for all traders.