2017-10-16 105 views
3

我正在閱讀asyncio程序包中ProtocolTransport類的文檔。具體來說:如何爲asyncio創建自定義傳輸?

當繼承協議類時,建議您重寫某些方法。這些方法是回調函數:它們將在某些事件中由傳輸器調用(例如,在收到某些數據時)。除非您正在實施交通工具,否則您不應該自己撥打電話,

着重強調

因此,在原則上,它應該是可以實現的傳輸,但是......

傳輸是通過ASYNCIO以各種抽象類提供各種溝通渠道。你通常不會自己實例化一個交通工具;相反,您將調用AbstractEventLoop方法(哪一個?)這將創建傳輸並嘗試啓動底層通信通道,並在成功時回撥您。

此外,加上強調

沿着和越過我沒有看到創建自定義運輸任何方式閱讀AbstractEventLoop部分。談到最近在AbstractEventLoop.create_connection(protocol_factory, host=...)只意味着它會創建某種插座...

好了,我的最終目標是使用自定義的傳輸是沒有任何一種插座(也許是StringIO,也許數據庫連接遊標,也許是另一種協議)。


所以,這只是在文檔一個良好的預期,但從來沒有實現過的錯誤,或者是有真正實現無猴修補asyncio和長子犧牲用戶傳輸的方法嗎?

回答

4

需要的鍋爐板代碼了一下,雖然它不是太多:

from asyncio import streams, transports, get_event_loop 


class CustomTransport(transports.Transport): 

    def __init__(self, loop, protocol, *args, **kwargs): 
     self._loop = loop 
     self._protocol = protocol 

    def get_protocol(self): 
     return self._protocol 

    def set_protocol(self, protocol): 
     return self._protocol 

    # Implement read/write methods 

    # [...] 


async def custom_create_connection(protocol_factory, *args, **kwargs): 
    loop = get_event_loop() 
    protocol = protocol_factory() 
    transport = CustomTransport(loop, protocol, *args, **kwargs) 
    return transport, protocol 


async def custom_open_connection(*args, **kwargs): 
    reader = streams.StreamReader() 
    protocol = streams.StreamReaderProtocol(reader) 
    factory = lambda: protocol 
    transport, _ = await custom_create_connection(factory, *args, **kwargs) 
    writer = streams.StreamWriter(transport, protocol, reader) 
    return reader, writer