2014-02-26 39 views
1

我有一個複雜的python管道(我不能改變這個代碼),調用多個其他腳本和其他可執行文件。關鍵是要花費很長時間來運行8000多個目錄,進行一些科學分析。所以,我寫了一個簡單的包裝器,(可能不是最有效的,但似乎工作)使用多處理模塊。多個python進程之間的os.chdir

from os import path, listdir, mkdir, system 
from os.path import join as osjoin, exists, isfile 
from GffTools import Gene, Element, Transcript 
from GffTools import read as gread, write as gwrite, sort as gsort 
from re import match 
from multiprocessing import JoinableQueue, Process 
from sys import argv, exit 

# some absolute paths 
inbase = "/.../abfgp_in" 
outbase = "/.../abfgp_out" 
abfgp_cmd = "python /.../abfgp-2.rev/abfgp.py" 
refGff = "/.../B0510_manual_reindexed_noSeq.gff" 

# the Queue 
Q = JoinableQueue() 
i = 0 

# define number of processes 
try: num_p = int(argv[1]) 
except ValueError: exit("Wrong CPU argument") 

# This is the function calling the abfgp.py script, which in its turn calls alot of third party software 
def abfgp(id_, pid): 
    out = osjoin(outbase, id_) 
    if not exists(out): mkdir(out) 

    # logfile 
    log = osjoin(outbase, "log_process_%s" %(pid)) 
    try: 
     # call the script 
     system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log)) 
    except: 
     print "ABFGP FAILED" 
     return 

# parse the output 
def extractGff(id_): 
    # code not relevant 


# function called by multiple processes, using the Queue 
def run(Q, pid): 
    while not Q.empty(): 
     try: 
      d = Q.get()    
      print "%s\t=>>\t%s" %(str(i-Q.qsize()), d)   
      abfgp(d, pid) 
      Q.task_done() 
     except KeyboardInterrupt: 
      exit("Interrupted Child") 

# list of directories 
genedirs = [d for d in listdir(inbase)] 
genes = gread(refGff) 
for d in genedirs: 
    i += 1 
    indir = osjoin(inbase, d) 
    outdir = osjoin(outbase, d) 
    Q.put(d) 

# this loop creates the multiple processes 
procs = [] 
for pid in range(num_p): 
    try: 
     p = Process(target=run, args=(Q, pid+1)) 
     p.daemon = True 
     procs.append(p) 
     p.start() 
    except KeyboardInterrupt: 
     print "Aborting start of child processes" 
     for x in procs: 
      x.terminate() 
     exit("Interrupted")  

try: 
    for p in procs: 
     p.join() 
except: 
    print "Terminating child processes" 
    for x in procs: 
     x.terminate() 
    exit("Interrupted") 

print "Parsing output..." 
for d in genedirs: extractGff(d) 

現在的問題是,abfgp.py使用os.chdir函數,這似乎中斷了並行處理。我收到很多錯誤,指出某些(輸入/輸出)文件/目錄不能用於讀/寫。即使我通過os.system()調用腳本,從中我通過產生單獨的進程可以防止這種情況。

我該如何解決這些chdir干擾問題?

編輯:我可能將os.system()更改爲subprocess.Popen(cwd =「...」)與正確的目錄。我希望這有所作爲。

謝謝。

+1

爲什麼使用'os.system'而不是'subprocess.call'?如果沒有字符串插值,它會更加麻煩。 –

+0

好的提示,你是對的:),但正如我所說,我雖然os.system會解決chdir干擾 – Sander

回答

0

編輯2

不要使用os.system()使用subprocess.call()

system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log)) 

將轉化

subprocess.call((abfgp_cmd, '--dna', osjoin(inbase, id_, id_ +".dna.fa"), '--multifasta', osjoin(inbase, id_, "informants.mfa"), '--target', id_, '-o', out, '-q')) # without log. 

編輯1 我認爲問題是,多使用MODUL e名稱來序列化函數,類。

這意味着,如果你做import module其中module是./module.py和你這樣做os.chdir('./dir')現在你需要from .. import module

子進程繼承父進程的文件夾。這可能是一個問題。

解決方案

  1. 確保所有模塊都是進口的(在子進程),並在此之後更改目錄
  2. 插入原始os.getcwd()sys.path啓用從原來的目錄導入。這必須在從本地目錄調用任何函數之前完成。
  3. 將您使用的所有功能放入一個始終可以導入的目錄中。 site-packages可能就是這樣一個目錄。然後,您可以執行諸如import modulemodule.main()之類的操作來啓動您的操作。
  4. 這是我做的一個黑客,因爲我知道鹹菜是如何工作的。只有在其他嘗試失敗時才使用它 腳本打印:

    serialized # the function runD is serialized 
    string executed # before the function is loaded the code is executed 
    loaded # now the function run is deserialized 
    run # run is called 
    

    在你的情況下,你會做這樣的事情:

    runD = evalBeforeDeserialize('__import__("sys").path.append({})'.format(repr(os.getcwd())), run) 
    p = Process(target=runD, args=(Q, pid+1)) 
    

    這是腳本:

    # functions that you need 
    
    class R(object): 
        def __init__(self, call, *args): 
    
         self.ret = (call, args) 
        def __reduce__(self): 
         return self.ret 
        def __call__(self, *args, **kw): 
         raise NotImplementedError('this should never be called') 
    
    class evalBeforeDeserialize(object): 
        def __init__(self, string, function): 
         self.function = function 
         self.string = string 
        def __reduce__(self): 
         return R(getattr, tuple, '__getitem__'), \ 
           ((R(eval, self.string), self.function), -1) 
    
    # code to show how it works   
    
    def printing(): 
        print('string executed') 
    
    def run(): 
        print('run') 
    
    runD = evalBeforeDeserialize('__import__("__main__").printing()', run) 
    
    import pickle 
    
    s = pickle.dumps(runD) 
    print('serialized') 
    run2 = pickle.loads(s) 
    print('loaded') 
    run2() 
    

請報告回來,如果這些做不行。

+0

我感謝你的努力,但我認爲你讓我錯了。我無法更改「abfgp.py」中的代碼,這是使用chdir的代碼。所以如果我產生abfgp.py的多個進程,他們將每個進程chdir。這些不同的過程互相干擾,改變彼此的輸入和輸出目錄。所以我不能改變進口。 – Sander

+0

這是否真的意味着如果在一個進程中執行'os.chdir',它會在另一個進程中更改'os.getcwd()'? – User

+0

我覺得它也很奇怪,但這正是我所經歷的。我認爲cwd存儲在sys.path中(沒有檢查),這對全部python進程是全局的嗎?我可能會嘗試這一個:http://stackoverflow.com/questions/13757734/working-in-different-directories-os-chdir-in-the-same-time-parallel-threading – Sander

0

您可以確定os庫的哪個實例是不可更改的程序正在使用的;然後在該庫中創建一個定製版本chdir,它可以執行所需的任務 - 防止目錄更改,記錄它,無論如何。如果量身定製的行爲只需要用於單個程序,則可以使用inspect模塊來識別調用者,並以特定方式調整該調用者的行爲。

如果您確實無法更改現有程序,您的選擇是有限的;但如果你有選擇改變它導入的庫,這樣的東西可能是一種最不具有侵入性的方式來避免不良行爲。

通常的注意事項適用於更改標準庫時。