2016-08-15 57 views
3

我有一個從文件描述符(包裝在C++端FILE*中)讀取的Python函數(用C++實現),我需要從asyncio.StreamReader提供函數。具體而言,讀者是HTTP響應的內容:aiohttp.ClientResponse.content從asyncio StreamReader將字節泵入文件描述符

我想我可能open a pipe,將讀取端傳遞給C++函數,並將connect the write-end傳遞給asyncio的事件循環。但是,如何通過適當的流量控制和儘可能少的複製將數據從流讀取器移動到管道?

與缺少的部分代碼的骨架如下:

# obtain the StreamReader from aiohttp 
content = aiohttp_client_response.content 
# create a pipe 
(pipe_read_fd, pipe_write_fd) = os.pipe() 

# now I need a suitable protocol to manage the pipe transport 
protocol = ? 
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd) 

# the protocol should start reading from `content` and writing into the pipe 
return pipe_read_fd 

回答

2

subprocess_attach_write_pipe ASYNCIO例如:

rfd, wfd = os.pipe() 
pipe = open(wfd, 'wb', 0) 
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe) 
transport.write(b'data') 

編輯 - 對於寫入流控制,請參閱下面的方法:

這裏是一個可能的FlowControl實施後,StreamWriter.drain啓發:

class FlowControl(asyncio.streams.FlowControlMixin): 
    async def drain(self): 
     await self._drain_helper() 

用法:

transport, protocol = await loop.connect_write_pipe(FlowControl, pipe) 
transport.write(b'data') 
await protocol.drain() 
+0

這顯示瞭如何打開管道與ASYNCIO寫作,但不顯示如何正確地複製從'asyncio.StreamReader'到管道。特別是,如果從管道讀取的隊伍速度太慢以至於跟不上StreamReader,簡單地從閱讀器讀取字節塊並將它們提供給'transport.write'可能會溢出緩衝區。 –

+0

@JanŠpaček看到我關於寫流控制的編輯,希望有所幫助。 – Vincent

0

我解決此問題得到了通過使用ThreadPoolExecutor和阻塞調用os.write

(read_fd, write_fd) = os.pipe() 
task_1 = loop.create_task(pump_bytes_into_fd(write_fd)) 
task_2 = loop.run_in_executor(executor_1, parse_bytes_from_fd(read_fd)) 

async def pump_bytes_into_fd(write_fd): 
    while True: 
     chunk = await stream.read(CHUNK_SIZE) 
     if chunk is None: break 
     # process the chunk 
     await loop.run_in_executor(executor_2, os.write, write_fd, chunk) 

這是至關重要的兩個不同的執行程序用於阻止讀取和寫入以避免死鎖。