1

我希望運行總共nAnalysis = 25個的Abaqus模型,核中的每使用X數目,我可以同時運行nParallelLoops = 5這些模型的。如果目前的5所分析的一個結束,然後又分析應該開始,直到所有nAnalysis完成。使用Concurrent.Futures.ProcessPoolExecutor運行同時與獨立ABAQUS模型

我使用和中發佈的解決方案實現了以下代碼。然而,我錯過了一些東西,因爲所有n分析嘗試從「一次」開始,代碼發生死鎖並且沒有分析完成,因爲很多時候可能想要使用與已經開始的分析相同的內核。

  1. Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs
  2. How to parallelize this nested loop in Python that calls Abaqus
def runABQfile(*args):  
    import subprocess 
    import os 

    inpFile,path,jobVars = args 

    prcStr1 = (path+'/runJob.sh') 

    process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path) 

def safeABQrun(*args): 
    import os 

    try: 
     runABQfile(*args) 
    except Exception as e: 
     print("Tread Error: %s runABQfile(*%r)" % (e, args)) 

def errFunction(ppos, *args): 
    import os 
    from concurrent.futures import ProcessPoolExecutor 
    from concurrent.futures import as_completed 
    from concurrent.futures import wait 

    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis)) # 5Nodes 
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

最多隻有這樣,現在我能夠運行是,如果我修改errFunction在下面的時候用了整整5分析。然而,這種方法有時會導致在分析的每一個組中採取比其他4長得多(每ProcessPoolExecutor調用),因此5下一組不會不顧資源的可用性(核)開始。最終這會導致更多時間完成所有25個型號。

def errFunction(ppos, *args): 
    import os 
    from concurrent.futures import ProcessPoolExecutor 
    from concurrent.futures import as_completed 
    from concurrent.futures import wait  

    # Group 1 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

    # Group 2 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

    # Group 3 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

    # Group 4 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

    # Group 5 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

我試過使用as_completed函數,但它似乎不工作。

請你能幫助找出正確的並行化,所以我可以運行nAnalysis,總是具有nParallelLoops同時運行? 您的幫助表示感謝。 我使用Python 2.7

最好成績, 大衛P.


UPDATE JULY二千零十六分之三十〇

我在safeABQrun推出了循環和管理的5個不同的「隊列」。循環是避免分析嘗試在節點中運行而另一個仍在運行的情況下所必需的。分析預先配置爲在開始任何實際分析之前在其中一個請求節點中運行。

def safeABQrun(*list_args): 
    import os 

    inpFiles,paths,jobVars = list_args 

    nA = len(inpFiles) 
    for k in range(0,nA): 
     args = (inpFiles[k],paths[k],jobVars[k]) 
     try: 
      runABQfile(*args) # Actual Run Function 
     except Exception as e: 
      print("Tread Error: %s runABQfile(*%r)" % (e, args)) 

def errFunction(ppos, *args): 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes 

     for f in as_completed(futures): 
      print("|=== Finish Process Train %d ===|" % futures[f]) 
      if f.exception() is not None: 
       print('%r generated an exception: %s' % (futures[f], f.exception())) 

回答

0

它看起來OK我,但是,我是不能運行代碼。如何極大地嘗試一些簡單的,然後添加的事情,直到「問題」出現?例如,下面顯示了你想要的行爲類型嗎?它在我的機器上,但我正在運行Python 3.5.2。你說你正在運行2.7,但concurrent.futures沒有在Python 2存在 - 如果你使用的是2。7,你必須運行某人的圖書館後臺,也許問題在於此。嘗試以下應有助於回答是否是這樣的話:

from concurrent.futures import ProcessPoolExecutor, wait, as_completed 

def worker(i): 
    from time import sleep 
    from random import randrange 
    s = randrange(1, 10) 
    print("%d started and sleeping for %d" % (i, s)) 
    sleep(s) 

if __name__ == "__main__": 
    nAnalysis = 25 
    nParallelLoops = 5 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     futures = dict((executor.submit(worker, k), k) for k in range(nAnalysis)) 
     for f in as_completed(futures): 
      print("got %d" % futures[f]) 

典型輸出:

0 started and sleeping for 4 
1 started and sleeping for 1 
2 started and sleeping for 1 
3 started and sleeping for 6 
4 started and sleeping for 5 
5 started and sleeping for 9 
got 1 
6 started and sleeping for 5 
got 2 
7 started and sleeping for 6 
got 0 
8 started and sleeping for 6 
got 4 
9 started and sleeping for 8 
got 6 
10 started and sleeping for 9 
got 3 
11 started and sleeping for 6 
got 7 
12 started and sleeping for 9 
got 5 
... 
0

我在safeABQrun推出了循環和管理的5個不同的「隊列」。循環是避免分析嘗試在節點中運行而另一個仍在運行的情況下所必需的。分析預先配置爲在開始任何實際分析之前在其中一個請求節點中運行。

def safeABQrun(*list_args): 
    import os 

    inpFiles,paths,jobVars = list_args 

    nA = len(inpFiles) 
    for k in range(0,nA): 
     args = (inpFiles[k],paths[k],jobVars[k]) 
     try: 
      runABQfile(*args) # Actual Run Function 
     except Exception as e: 
      print("Tread Error: %s runABQfile(*%r)" % (e, args)) 

def errFunction(ppos, *args): 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes 

     for f in as_completed(futures): 
      print("|=== Finish Process Train %d ===|" % futures[f]) 
      if f.exception() is not None: 
       print('%r generated an exception: %s' % (futures[f], f.exception())) 
相關問題