2017-04-10 81 views
0

我有一個arcpy進程需要在一堆圖層上進行聯合,運行一些計算並編寫HTML報告。鑑於我需要生成的報告數量(約2,100個),我需要儘快完成此過程(我的目標是每個報告2秒)。當我遇到一個問題時,我嘗試了很多方法來做到這一點,包括多處理,也就是說,無論使用多少個內核,運行多進程部分的時間本質上都需要相同的時間。多核處理時間線性增加

例如,對於相同數量的報告:

  • 2芯花〜30秒每輪(所以40個報告需要40/2 * 30秒)
  • 4芯花〜60秒( 40/4 * 60)
  • 10芯花〜160秒(40/10 * 160)

等。它的工作時間相同,因爲每次翻動兩倍的時間需要兩次。

這是否意味着我的問題是I/O綁定,而不是CPU綁定?(如果是這樣 - 我該怎麼辦?)我認爲這是後者,因爲我的時間的大瓶頸是工會(它佔用了大約50%的處理時間)。在ArcGIS中,聯盟通常很昂貴,所以我認爲破解它並且一次運行2 - 10會快2到10倍。或者,可能我錯誤地實施了多進程?

## Worker function just included to give some context 

def worker(sub_code): 
    layer = 'in_memory/lyr_{}'.format(sub_code) 
    arcpy.Select_analysis(subbasinFC, layer, where_clause="SUB_CD = '{}'".format(sub_code)) 
    arcpy.env.extent = layer 
    union_name = 'in_memory/union_' + sub_code 

    arcpy.Union_analysis([fields], 
        union_name, 
        "NO_FID", "1 FEET") 
    #.......Some calculations using cursors 

    # Templating using Jinjah 
    context = {} 
    context['DATE'] = now.strftime("%B %d, %Y") 
    context['SUB_CD'] = sub_code 
    context['SUB_ACRES'] = sum([r[0] for r in arcpy.da.SearchCursor(union, ["ACRES"], where_clause="SUB_CD = '{}'".format(sub_code))]) 
    # Etc 

    # Then write the report out using custom function 
    write_html('template.html', 'output_folder', context) 


if __name__ == '__main__': 
    subList = sorted({r[0] for r in arcpy.da.SearchCursor(subbasinFC, ["SUB_CD"])}) 
    NUM_CORES = 7 
    chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)] 
    for chunk in chunk_list: 
     jobs = [] 
     for subbasin in chunk: 
      p = multiprocessing.Process(target=worker, args=(subbasin,)) 
      jobs.append(p) 
      p.start() 

     for process in jobs: 
      process.join() 
+0

你的盒子有多少個核心? –

+0

8個物理16「邏輯處理器」 – HFBrowning

回答

3

這裏沒有多少東西可以介紹,我對ArcGIS沒有經驗。所以我只能注意到兩個更高層次的東西。首先,「通常的」方式來處理,這將是取代您NUM_CORES = 7下面所有的代碼:

pool = multiprocessing.Pool(NUM_CORES) 
pool.map(worker, subList) 
pool.close() 
pool.join() 

map()負責保持所有的工作進程儘可能忙的。原來,你啓動了7個進程,然後等待他們的全部完成。在最慢消失之前完成的所有進程,以及它們的核心處於空閒狀態,等待下一個外部循環迭代。 A Pool可以讓這7個流程在整個工作期間保持活躍狀態​​,並在每完成一項工作後立即爲每一項新工作提供幫助。

其次,這部分有邏輯錯誤結束:

chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)] 

你想NUM_CORES存在,而不是NUM_CORES-1。作爲-是,第一時間在你身邊提取

subList[0:7] 

然後

subList[6:13] 

然後

subList[12:19] 

等。 subList[6]subList[12](等)每個提取兩次。子列表重疊。

+0

+1,謝謝。我的問題(和實際情況)是一團糟,因爲我的代碼同時出現了太多的錯誤。接受是因爲圍繞「Process」玩弄我相信,等待所有進程完成的開銷是造成奇怪減速的原因。使用'Pool'開始真的很強大 - 但我一直在避免它,因爲它最終崩潰。我不認爲我在刪除記憶中的東西方面做得很好。當我知道所有細節時,我會回來更新我的問題 - 也許這樣可能會更好地幫助別人。 – HFBrowning

+1

嗯。需要嘗試的方法:將'maxtasksperchild = 1'添加到'Pool'構造函數,並將'chunksize = 1'添加到'map()'調用。這將強制爲每個工作項目創建一個全新的流程。雖然這並不能「解決」任何事情,但在某些情況下,它可以有效地隱藏其他問題,至少可以取得進展;-) –

0

我不確定您是否正確使用Process池來跟蹤您的工作。這:

for subbasin in chunk: 
    p = multiprocessing.Process(target=worker, args=(subbasin,)) 
    jobs.append(p) 
    p.start() 

    for process in jobs: 
     process.join() 

應改爲:

for subbasin in chunk: 
    p = multiprocessing.Process(target=worker, args=(subbasin,)) 
    p.start() 
    p.join() 

有沒有你要對spec of using the multiprocessing library具體原因是什麼?在等待線程終止之前,您不會等待,直到它將創建一大堆父進程不處理的進程。

+0

如果OP按照您的建議進行操作,它們將保證不會有任何並行性:第二個循環啓動一個進程,並在循環進入下一個進程之前等待它完成處理。它只是連續運行一個進程。 –

+0

@TimPeters:同意 - 根據您在響應中建議的內核數量,應該有另一個父循環。 OP的代碼沒有考慮到這一點,也沒有遵循lib規範。 – tatlar

+0

我在OP的'Process'中沒有看到任何錯誤。他們的代碼同時觸發'NUM_CORES'個進程,等待它們完成,然後循環獲取下一個'NUM_CORES'個工作項。沒關係。我只建議使用'map()',因爲它更容易完成所有操作,並且以更有效的方式(保持所有工作進程儘可能繁忙)。什麼 - 非常具體 - 你認爲不符合規範?同時啓動儘可能多的流程是絕對合理的(儘管在某些時候會變得適得其反)。 –

1

你不足以確定你在做什麼。例如,你的env.workspace是什麼? subbasinFC的值是多少?您似乎在每個過程開始時都要進行分析,將數據過濾爲layer。但是subbasinFC來自磁盤或內存?如果它來自磁盤,我建議你在任何進程嘗試過濾之前將所有內容都讀入內存。如果你有足夠的內存來支持它,這應該會加快速度。否則,是的,你對輸入數據進行I/O綁定。

原諒我的arcpy無知,但爲什麼你在你的總和context['SUB_ACRES']插入一個where子句?您是不是已經在開始時過濾了sub_code? (我們不知道工會是什麼,所以也許你會和未經過濾的東西結合......)

+0

+1,因爲從磁盤觀察中讀取數據。你是對的 - 我將所有的讀取操作從磁盤改爲內存,並將基本工作器功能提高了3倍(連續運行需要90秒,現在需要30秒)。謝謝!這本身就是一個了不起的改進 – HFBrowning