1
如何使用asyncio在兩個協程之間實現管道,一個從流中讀取,另一個寫入其中?如何在兩個Python asyncio協程之間使用讀/寫流?
假設我們有這個現有的代碼,兩個簡單的腳本。一個產生到stdout:
# produce.py
import asyncio
import random
import sys
async def produce(stdout):
for i in range(10000):
await asyncio.sleep(random.randint(0, 3))
print(i, file=stdout, flush=True)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(produce(sys.stdout))
loop.close()
以及從標準輸入讀取其他:
# consume.py
async def consume(loop, stdin):
reader = asyncio.StreamReader(loop=loop)
reader_protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: reader_protocol, stdin)
while True:
line = await reader.readline()
if not line:
break
print(int(line) ** 2)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(consume(loop, sys.stdin))
loop.close()
很明顯,因爲我們的兩件可以在命令行單獨運行,我們可以使用subprocess
模塊殼管(produce | consume
)。
但是我們希望在Python中實現等價的Unix管道,即連接這兩個現有函數的流。
像這樣的東西是行不通的:
pipe = io.BytesIO()
await asyncio.gather(produce(pipe),
consume(loop, pipe))
如果這兩個功能將操作發電機,我們可以寫這樣的事情(蟒蛇3.6):
async def produce():
for i in range(10000):
await asyncio.sleep(random.randint(0, 3))
yield str(i)
async def consume(generator):
async for value in generator:
print(int(value) ** 2)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(consume(produce()))
loop.close()
有一些地方asyncio API會允許這樣嗎?
謝謝!
這假定流數據是換行符分隔的文本。 –
在我們的情況下,這是總是得到一個完整的路線,但我想你可以使用'read(1024)',而不是如果你想要一個二進制流。 – Natim