2011-05-14 75 views
1

我在Jython中有一個函數,這個函數使用Popen來運行另一個程序,它將xml文件寫入到它的stdout中,該文件指向一個文件。當過程完成後,我關閉文件並調用另一個函數來解析它。我在解析過程中遇到了一堆錯誤消息,指向訪問已關閉的文件和/或格式不正確的xml文件(在我看着它們時顯示正常)。我認爲output.close()可能會在關閉文件之前返回,所以我添加了一個等待output.closed爲true的循環。這似乎起初工作,但後來我的程序打印以下nio解析xml文件時出錯

blasting 
blasted 
parsing 
parsed 
    Extending genes found via genemark, 10.00% done 
blasting 
blasted 
parsing 
Exception in thread "_CouplerThread-7 (stdout)" Traceback (most recent call last): 
    File "/Users/mbsulli/jython/Lib/subprocess.py", line 675, in run 
    self.write_func(buf) 
IOError: java.nio.channels.AsynchronousCloseException 
[Fatal Error] 17_2_corr.blastp.xml:15902:63: XML document structures must start and end within the same entity. 
Retry 
blasting 
blasted 
parsing 
Exception in thread "_CouplerThread-9 (stdout)" Traceback (most recent call last): 
    File "/Users/mbsulli/jython/Lib/subprocess.py", line 675, in run 
    self.write_func(buf) 
IOError: java.nio.channels.ClosedChannelException 
[Fatal Error] 17_2_corr.blastp.xml:15890:30: XML document structures must start and end within the same entity. 
Retry 
blasting 

我不知道我的選擇是從這裏。我是否正確地認爲在我解析它之前沒有寫入xml?如果是的話,我可以確保它是誰。

def parseBlast(fileName): 
    """ 
    A function for parsing XML blast output. 
    """ 
    print "parsing" 
    reader = XMLReaderFactory.createXMLReader() 
    reader.entityResolver = reader.contentHandler = BlastHandler() 
    reader.parse(fileName) 
    print "parsed" 

    return dict(map(lambda iteration: (iteration.query, iteration), reader.getContentHandler().iterations)) 

def cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote = False, force = False): 
    """ 
    Performs a blast search using the blastp executable and database in blastLocation on 
    the query with the eValue. The result is an XML file saved to fileName. If fileName 
    already exists the search is skipped. If remote is true then the search is done remotely. 
    """ 
    if not os.path.isfile(fileName) or force: 
    output = open(fileName, "w") 
    command = [blastLocation + "/bin/blastp", 
       "-evalue", str(eValue), 
       "-outfmt", "5", 
       "-query", query] 
    if remote: 
     command += ["-remote", 
        "-db", database] 
    else: 
     command += ["-num_threads", str(Runtime.getRuntime().availableProcessors()), 
        "-db", database] 
    print "blasting" 
    blastProcess = subprocess.Popen(command, 
             stdout = output) 
    while blastProcess.poll() == None: 
     if pipeline.exception: 
     print "Stopping in blast" 
     blastProcess.kill() 
     output.close() 
     raise pipeline.exception 
    output.close() 
    while not output.closed: 
     pass 
    print "blasted" 
    try: 
    return parseBlast(fileName) 
    except SAXParseException: 
    print 'Retry' 
    return cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote, True) 

回答

1

我覺得這個問題開始時,我從調用子進程等待使用輪詢方法,所以我可以運行時對其停止進程切換。因爲我已經有了許多與之合作的數據集的結果,所以我不得不再次啓動子進程,所以很難說清楚。無論如何,我的猜測是,當我關閉它時輸出仍然被寫入,我的解決方案是切換到管道並自己寫文件。

def cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote = False, force = False): 


""" 
Performs a blast search using the blastp executable and database in blastLocation on 
the query with the eValue. The result is an XML file saved to fileName. If fileName 
already exists the search is skipped. If remote is true then the search is done remotely. 
""" 
    if not os.path.isfile(fileName) or force: 
    output = open(fileName, "w") 
    command = [blastLocation + "/bin/blastp", 
       "-evalue", str(eValue), 
       "-outfmt", "5", 
       "-query", query] 
    if remote: 
     command += ["-remote", 
        "-db", database] 
    else: 
     command += ["-num_threads", str(Runtime.getRuntime().availableProcessors()), 
        "-db", database] 
    blastProcess = subprocess.Popen(command, 
            stdout = subprocess.PIPE) 
    while blastProcess.poll() == None: 
     output.write(blastProcess.stdout.read()) 
     if pipeline.exception: 
     psProcess = subprocess.Popen(["ps", "aux"], stdout = subprocess.PIPE) 
     awkProcess = subprocess.Popen(["awk", "/" + " ".join(command).replace("/", "\\/") + "/"], stdin = psProcess.stdout, stdout = subprocess.PIPE) 
     for line in awkProcess.stdout: 
      subprocess.Popen(["kill", "-9", re.split(r"\s+", line)[1]]) 
     output.close() 
     raise pipeline.exception 
    remaining = blastProcess.stdout.read() 
    while remaining: 
     output.write(remaining) 
     remaining = blastProcess.stdout.read() 

    output.close() 

    try: 
    return parseBlast(fileName) 
    except SAXParseException: 
    return cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote, True)