2017-01-23 131 views
0

我有一個長期運行的計算模型,我希望控制,將數據輸入到數據並使用STDIN和讀取數據。在這個外部代碼裏面,有一個控制反饋循環需要從STDIN每100ms左右的新數據。寫入到標準輸入並從標準輸出中讀取長時間運行的子進程中的python

因此,subprocess.communicate()是不合適的,因爲它等待過程完成。該過程的估計運行時間爲數週。

下面的代碼不起作用,因爲控制掛在stdout.read()並永不回來。

通過STDIN和STDOUT討論的正確方法是什麼?

import subprocess as sb 


class Foo: 
    def process_output(self,values: str) ->(): 
     """ gets 7 comma separated floats back from ADC.elf and returns them as a tuple of two vectors """ 
     floats = [float(f) for f in values.split(',') if values and f] 
     # if len(floats) == 7: 
     mag = (floats[0], floats[1], floats[2]) 
     quat = (floats[3], floats[4], floats[5], floats[6]) 
     return (mag, quat) 

    def format_input(self,invals: {}) -> bytes: 
     """ takes a dict of arrays and stuffs them into a comma-separated bytestring to send to ADC.elf with a trailing newline""" 
     concat = lambda s, f: ''.join([f % x for x in s]) 
     retval = '' 
     retval += concat(invals['mag_meas'], '%3.2f,') 
     retval += concat(invals['euler_angle'], '%3.2f,') 
     retval += concat(invals['sun_meas'], '%3.2f,') 
     retval += concat(invals['epoch'], '%02.0f,') 
     retval += concat(invals['lla'], '%3.2f,') 
     retval += concat([invals['s_flag']], '%1.0f,') 
     retval = retval[:-1] 
     retval += '\n' 
     return retval.encode('utf-8') 

    def page(self,input: {}) ->(): 
     """ send a bytestring with 19 floats to ADC.elf. Process the returned 7 floats into a data struture""" 
     formatted = self.format_input(input) 
     self.pid.stdin.write(formatted) 
     response = self.pid.stdout.read() 

     return self.process_output(response.decode()) 

    def __init__(self): 
     """ start the long-running process ADC.elf that waits for input and performs some scientific computation""" 
     self.pid = sb.Popen(args=['./ADC.elf'], stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE) 

    def exit(self): 
     """ send SIGTERM to ADC.elf""" 
     self.pid.terminate() 



if __name__ == "__main__": 
    # some dummy data 
    testData = {'mag_meas': [1, 2, 3], 
       'euler_angle': [4, 5, 6], 
       'sun_meas': [7, 8, 9], 
       'epoch': [0, 1, 2, 3, 4, 5], 
       'lla': [6, 7, 8], 
       's_flag': 9 
       } 
    #initialize 
    runner = Foo() 
    # send and receive once. 
    result = runner.page(testData) 
    print(result) 
    #clean up 
    runner.exit() 
+0

['溝通()'](https://github.com/python/cpython /blob/eb70a87363193a22a2ee36a44efd8372f97efeae/Lib/subprocess.py#L1051)本身使用線程。 ['select'](https://docs.python.org/2/library/select.html)也是一個選項。正如非阻塞IO。 – dhke

+0

@dhke這是真的,但溝通()將不會返回,直到過程完成;我想在它的單個執行過程中多次查詢它。 – gvoysey

+0

因此,我的建議是:看看如何溝通,並適應您的需求。 'communic()'顯式等待讀者線程。放下那個,你有連續的投票。 – dhke

回答

0

不知道如何與子直接做到這一點,但pexpect做完全正確的事情:

import pexpect, os 
from time import sleep 

class Foo: 
    def process_output(self,values: str) ->(): 
     floats = [float(f) for f in values.split(',') if values and f] 
     # if len(floats) == 7: 
     mag = (floats[0], floats[1], floats[2]) 
     quat = (floats[3], floats[4], floats[5], floats[6]) 
     return (mag, quat) 

    def format_input(self,invals: {}) -> bytes: 
     concat = lambda s, f: ''.join([f % x for x in s]) 
     retval = '' 
     retval += concat(invals['mag_meas'], '%3.2f,') 
     retval += concat(invals['euler_angle'], '%3.2f,') 
     retval += concat(invals['sun_meas'], '%3.2f,') 
     retval += concat(invals['epoch'], '%02.0f,') 
     retval += concat(invals['lla'], '%3.2f,') 
     retval += concat([invals['s_flag']], '%1.0f,') 
     retval = retval[:-1] 
     retval += '\n' 
     return retval.encode('utf-8') 

    def page(self,input: {}) ->(): 
     formatted = self.format_input(input) 
     self.pid.write(formatted) 
     response = self.pid.readline() 

     return self.process_output(response.decode()) 

    def __init__(self): 

     self.pid = pexpect.spawn('./ADC.elf') 
     self.pid.setecho(False) 

    def exit(self): 
     self.pid.terminate() 



if __name__ == "__main__": 
    testData = {'mag_meas': [1, 2, 3], 
       'euler_angle': [4, 5, 6], 
       'sun_meas': [7, 8, 9], 
       'epoch': [0, 1, 2, 3, 4, 5], 
       'lla': [6, 7, 8], 
       's_flag': 9 
       } 
    runner = Foo() 
    i = 0 
    while i < 100: 
     result = runner.page(testData) 
     print(result) 
     i += 1 
     sleep(.1) 



    runner.exit() 
相關問題