0

我如何實現對我的函數的多處理。我嘗試這樣做,但沒有奏效。對python函數進行多處理

def steric_clashes_parallel(system): 
    rna_st = system[MolWithResID("G")].molecule() 
    for i in system.molNums(): 
     peg_st = system[i].molecule() 
     if rna_st != peg_st: 
      print(peg_st) 
      for i in rna_st.atoms(AtomIdx()): 
       for j in peg_st.atoms(AtomIdx()): 
#     print(Vector.distance(i.evaluate().center(), j.evaluate().center())) 
        dist = Vector.distance(i.evaluate().center(), j.evaluate().center()) 
        if dist<2: 
         return print("there is a steric clash") 
    return print("there is no steric clashes") 

mix = PDB().read("clash_1.pdb") 
system = System() 
system.add(mix)  
from multiprocessing import Pool 
p = Pool(4) 
p.map(steric_clashes_parallel,system) 

我有一千個pdb或系統文件來測試通過這個函數。在沒有多處理模塊的情況下,單個內核上的一個文件需要2小時。任何建議都會有很大的幫助。

我回溯看起來是這樣的:

self.run() 
File "/home/sajid/sire.app/bundled/lib/python3.3/threading.py", line 858, 
    in run self._target(*self._args, **self._kwargs) 
    File "/home/sajid/sire.app/bundled/lib/python3.3/multiprocessing/pool.py", line 351, 
     in _handle_tasks put(task) 
     File "/home/sajid/sire.app/bundled/lib/python3.3/multiprocessing/connection.py", line 206, 
      in send ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) 
RuntimeError: Pickling of "Sire.System._System.System" instances is not enabled 
(boost.org/libs/python/doc/v2/pickle.html) 
+0

「沒有工作」是不夠的...你得到什麼錯誤?什麼是「PDB」和「System()」?我們並不都是空談的人。 – tdelaney

+0

對不起,我在多處理模塊上沒有足夠的背景。在運行腳本我得到了以下輸出; –

+0

self.run() 文件「/home/sajid/sire.app/bundled/lib/python3.3/threading.py」,行858,運行 self._target(* self._args,** self。 _kwargs) 文件「/home/sajid/sire.app/bundled/lib/python3.3/multiprocessing/pool.py」,第351行,在_handle_tasks put(任務) 文件「/home/sajid/sire.app /bundled/lib/python3.3/multiprocessing/connection.py「,行206,發送 ForkingPickler(buf,pickle.HIGHEST_PROTOCOL).dump(obj) RuntimeError:酸洗」Sire.System._System.System「實例沒有啓用(http://www.boost.org/libs/python/doc/v2/pickle.html) –

回答

3

的問題是,Sire.System._System.System不能序列,因此它不能被髮送到子進程。多處理使用pickle模塊進行序列化,您可以經常在主程序中通過pickle.dumps(my_mp_object)進行完整性檢查以進行驗證。

你有另一個問題,雖然(或者我認爲你是基於變量名)。 map方法需要迭代並將其迭代對象放到池成員中,但看起來您要處理system本身,而不是迭代的東西。

多處理的一個訣竅就是讓您從父級發送給子級的有效負載變得簡單,並讓子級負責創建其對象。在這裏,您最好只發送文件名並讓孩子們完成大部分工作。

def steric_clashes_from_file(filename): 
    mix = PDB().read(filename) 
    system = System() 
    system.add(mix)  
    steric_clashes_parallel(system) 

def steric_clashes_parallel(system): 
    rna_st = system[MolWithResID("G")].molecule() 
    for i in system.molNums(): 
     peg_st = system[i].molecule() 
     if rna_st != peg_st: 
      print(peg_st) 
      for i in rna_st.atoms(AtomIdx()): 
       for j in peg_st.atoms(AtomIdx()): 
#     print(Vector.distance(i.evaluate().center(), j.evaluate().center())) 
        dist = Vector.distance(i.evaluate().center(), j.evaluate().center()) 
        if dist<2: 
         return print("there is a steric clash") 
    return print("there is no steric clashes") 

filenames = ["clash_1.pdb",] 
from multiprocessing import Pool 
p = Pool(4, chunksize=1) 
p.map(steric_clashes_from_file,filenames) 
+0

@馬蒂諾 - 是的,謝謝你指出。 – tdelaney

+0

我測試了泡菜命令,它給了我; –

0

@ martineau: 我測試了pickle命令,它給了我;

----> 1 pickle.dumps(clash_1.pdb) 
    RuntimeError: Pickling of "Sire.Mol._Mol.MoleculeGroup" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html) 
    ----> 1 pickle.dumps(system) 
    RuntimeError: Pickling of "Sire.System._System.System" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html) 

用你的腳本,它只用了一個單一的核心。 dist線雖然可迭代。我可以在多線運行這條單線嗎?我將該行修改爲;

for i in rna_st.atoms(AtomIdx()): 
        icent = i.evaluate().center() 
        for j in peg_st.atoms(AtomIdx()): 
         dist = Vector.distance(icent, j.evaluate().center()) 
0

有一個技巧可以讓每個文件獲得更快的計算 - 按順序處理每個文件,但並行處理文件的內容。這取決於一些注意事項:

  1. 您正在運行一個可以分叉進程的系統(如Linux)。
  2. 您正在進行的計算沒有影響未來計算結果的副作用。

看來你的情況就是這樣,但我不能100%確定。

當一個進程分叉時,子進程中的所有內存都是從父進程中複製的(更重要的是它以高效的方式複製 - 只讀的內存位不會被複制)。這使得在進程之間共享大而複雜的初始狀態變得很容易。但是,一旦子進程已經開始,他們將不會看到父進程中所做對象的任何更改(反之亦然)。

示例代碼:

import multiprocessing 

system = None 
rna_st = None 

class StericClash(Exception): 
    """Exception used to halt processing of a file. Could be modified to 
    include information about what caused the clash if this is useful.""" 
    pass 


def steric_clashes_parallel(system_index): 
    peg_st = system[system_index].molecule() 
    if rna_st != peg_st: 
     for i in rna_st.atoms(AtomIdx()): 
      for j in peg_st.atoms(AtomIdx()): 
       dist = Vector.distance(i.evaluate().center(), 
        j.evaluate().center()) 
       if dist < 2: 
        raise StericClash() 


def process_file(filename): 
    global system, rna_st 

    # initialise global values before creating pool  
    mix = PDB().read(filename) 
    system = System() 
    system.add(mix) 
    rna_st = system[MolWithResID("G")].molecule() 

    with multiprocessing.Pool() as pool: 
     # contents of file processed in parallel 
     try: 
      pool.map(steric_clashes_parallel, range(system.molNums())) 
     except StericClash: 
      # terminate called to halt current jobs and further processing 
      # of file 
      pool.terminate() 
      # wait for pool processes to terminate before returning 
      pool.join() 
      return False 
     else: 
      pool.close() 
      pool.join() 
      return True 
     finally: 
      # reset globals 
      system = rna_st = None 

if __name__ == "__main__": 
    for filename in get_files_to_be_processed(): 
     # files are being processed in serial 
     result = process_file(filename) 
     save_result_to_disk(filename, result) 
+0

需要注意的是,這適用於分叉進程保留父內存的Linux,但不適用於在沒有這種共享狀態的情況下創建新進程的Windows。 – tdelaney