2015-02-06 67 views
8

我有一個函數執行一些模擬,並且 返回一個字符串格式的數組。Python多處理 - 跟蹤pool.map操作的過程

我想運行 變化的輸入參數值,超過10000個可能的輸入值, 的仿真(功能),並將結果寫入單個文件。

我正在使用多處理,特別是pool.map函數 並行運行模擬。

由於運行仿真功能10000次以上的整個過程需要很長時間,我真的很想跟蹤整個操作過程。

我認爲我當前代碼中的問題是,pool.map運行10000次函數,在這些操作過程中沒有任何進程跟蹤。一旦並行處理完成10000次仿真(可能需要數小時至數天),那麼當10000個仿真結果保存到文件時,我會繼續跟蹤。因此,這並不是真正跟蹤pool.map操作的處理。

是否有一個容易修復我的代碼,將允許進程跟蹤?

def simFunction(input): 
    # Does some simulation and outputs simResult 
    return str(simResult) 

# Parallel processing 

inputs = np.arange(0,10000,1) 

if __name__ == "__main__": 
    numCores = multiprocessing.cpu_count() 
    pool = multiprocessing.Pool(processes = numCores) 
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out: 
     print("Starting to simulate " + str(len(inputs)) + " input values...") 
     counter = 0 
     for i in t: 
      out.write(i + '\n') 
      counter = counter + 1 
      if counter%100==0: 
       print(str(counter) + " of " + str(len(inputs)) + " input values simulated") 
    print('Finished!!!!') 

回答

7

如果使用迭代的map函數,可以很容易地跟蹤進度。您也可以使用異步map。在這裏,我會做一些不同的事情,只是混淆。

>>> import time 
>>> res = Pool().amap(simFunction, x,y) 
>>> while not res.ready(): 
... print "waiting..." 
... time.sleep(5) 
... 
waiting... 
waiting... 
waiting... 
waiting... 
>>> res.get() 
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899] 

請注意,我使用pathos.multiprocessing代替multiprocessing。這只是multiprocessing的一個分支,它使您能夠利用多個輸入來執行map函數,具有更好的序列化,並允許您在任何地方(而不僅僅是在__main__)執行map調用。您也可以使用multiprocessing來完成上述操作,但代碼會略有不同。

迭代或異步map將使您能夠編寫任何代碼,以便執行更好的過程跟蹤。例如,將一個唯一的「id」傳遞給每個作業,並觀察哪些回來,或者讓每個作業返回它的進程ID。有很多方法可以跟蹤進度和流程...但上面的內容應該會爲您提供一個開始。

你可以pathos這裏:https://github.com/uqfoundation

+0

非常感謝你! – user32147 2015-02-26 17:08:45

3

沒有「簡單修復」。 map是關於隱藏你的實現細節。在這種情況下,你想要細節。就是說,根據定義,事情變得更加複雜一些。你需要改變通信範式。有很多方法可以這樣做。

一個是:創建一個隊列來收集您的結果,並讓您的工作人員將結果放入此隊列中。然後,您可以在監視線程或進程內查看隊列,並在進入時使用結果。在使用時,您可以分析它們並生成日誌輸出。這可能是跟蹤進度的最常用方式:您可以實時以任何方式響應傳入結果。

更簡單的方法可能是稍微修改您的輔助函數,並在那裏生成日誌輸出。通過使用外部工具仔細分析日誌輸出(例如grepwc),您可以想出很簡單的方法來跟蹤。

+1

謝謝。你能提供一些簡單的例子嗎? – user32147 2015-02-06 22:53:50

3

我想你需要的是一個日誌文件

我建議您使用日誌記錄模塊,它是Python標準庫的一部分。但不幸的是日誌記錄不是多處理安全的。所以你不能在你的應用程序中使用它。

因此,您將需要使用多處理安全的日誌處理程序,或者使用Queue或實現您的模塊或日誌模塊。

在Stackoverflow中有很多關於此的討論。這比如:How should I log while using multiprocessing in Python?

如果大多數CPU的負載是在模擬功能,你不打算使用日誌輪換,你也許可以用一個簡單的鎖定機制是這樣的:

import multiprocessing 
import logging 

from random import random 
import time 


logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s %(process)s %(levelname)s %(message)s', 
    filename='results.log', 
    filemode='a' 
) 


def simulation(a): 
    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Simulating with %s" % a) 

    # simulation 
    time.sleep(random()) 
    result = a*2 

    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Finished simulation with %s. Result is %s" % (a, result)) 

    return result 

if __name__ == '__main__': 

    logging.debug("Starting the simulation") 
    inputs = [x for x in xrange(100)] 
    num_cores = multiprocessing.cpu_count() 
    print "num_cores: %d" % num_cores 
    pool = multiprocessing.Pool(processes=num_cores) 
    t = pool.map(simulation, inputs) 
    logging.debug("The simulation has ended") 

你可以運行時「tail -f」您的日誌文件。這是你應該看到的:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation 
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12 
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28 
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20 
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16 
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8 
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4 
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24 
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0 
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24 
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13 
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16 
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9 
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48 
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25 
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50 
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26 
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26 
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14 
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28 
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15 
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8 
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5 

在Windows和Linux上試過。

希望這會有所幫助

+0

'multiprocessing.get_logger()'返回受鎖保護的特性受限記錄器,請參閱https://docs.python.org/2/library/multiprocessing.html#logging – 2015-02-09 00:07:36

+0

是的,但是這是模塊記錄器...所以你可以使用它,你的日誌將與模塊級消息混合在一起:嘗試它,你會看到這樣的消息:2015-02-08 23:47:10,954 9288 DEBUG創建帶句柄的semlock 448 – 2015-02-09 02:48:34

+0

哦,你對,我從來沒有真正使用過它,並且太快地瀏覽了文檔。 – 2015-02-09 13:04:49