0
我有一個長期運行的計算模型,我希望控制,將數據輸入到數據並使用STDIN
和讀取數據。在這個外部代碼裏面,有一個控制反饋循環需要從STDIN
每100ms左右的新數據。寫入到標準輸入並從標準輸出中讀取長時間運行的子進程中的python
因此,subprocess.communicate()
是不合適的,因爲它等待過程完成。該過程的估計運行時間爲數週。
下面的代碼不起作用,因爲控制掛在stdout.read()
並永不回來。
通過STDIN和STDOUT討論的正確方法是什麼?
import subprocess as sb
class Foo:
def process_output(self,values: str) ->():
""" gets 7 comma separated floats back from ADC.elf and returns them as a tuple of two vectors """
floats = [float(f) for f in values.split(',') if values and f]
# if len(floats) == 7:
mag = (floats[0], floats[1], floats[2])
quat = (floats[3], floats[4], floats[5], floats[6])
return (mag, quat)
def format_input(self,invals: {}) -> bytes:
""" takes a dict of arrays and stuffs them into a comma-separated bytestring to send to ADC.elf with a trailing newline"""
concat = lambda s, f: ''.join([f % x for x in s])
retval = ''
retval += concat(invals['mag_meas'], '%3.2f,')
retval += concat(invals['euler_angle'], '%3.2f,')
retval += concat(invals['sun_meas'], '%3.2f,')
retval += concat(invals['epoch'], '%02.0f,')
retval += concat(invals['lla'], '%3.2f,')
retval += concat([invals['s_flag']], '%1.0f,')
retval = retval[:-1]
retval += '\n'
return retval.encode('utf-8')
def page(self,input: {}) ->():
""" send a bytestring with 19 floats to ADC.elf. Process the returned 7 floats into a data struture"""
formatted = self.format_input(input)
self.pid.stdin.write(formatted)
response = self.pid.stdout.read()
return self.process_output(response.decode())
def __init__(self):
""" start the long-running process ADC.elf that waits for input and performs some scientific computation"""
self.pid = sb.Popen(args=['./ADC.elf'], stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE)
def exit(self):
""" send SIGTERM to ADC.elf"""
self.pid.terminate()
if __name__ == "__main__":
# some dummy data
testData = {'mag_meas': [1, 2, 3],
'euler_angle': [4, 5, 6],
'sun_meas': [7, 8, 9],
'epoch': [0, 1, 2, 3, 4, 5],
'lla': [6, 7, 8],
's_flag': 9
}
#initialize
runner = Foo()
# send and receive once.
result = runner.page(testData)
print(result)
#clean up
runner.exit()
['溝通()'](https://github.com/python/cpython /blob/eb70a87363193a22a2ee36a44efd8372f97efeae/Lib/subprocess.py#L1051)本身使用線程。 ['select'](https://docs.python.org/2/library/select.html)也是一個選項。正如非阻塞IO。 – dhke
@dhke這是真的,但溝通()將不會返回,直到過程完成;我想在它的單個執行過程中多次查詢它。 – gvoysey
因此,我的建議是:看看如何溝通,並適應您的需求。 'communic()'顯式等待讀者線程。放下那個,你有連續的投票。 – dhke