2014-09-02 34 views
2

我正在嘗試編寫一個服務器端事件服務器,我可以使用telnet連接到該服務器,並將telnet內容推送到瀏覽器。使用Python和asyncio的想法是使用盡可能小的CPU,因爲它將在Raspberry Pi上運行。Python - asyncio協議/服務器之間的通信

到目前爲止,我有以下使用在這裏找到的庫:https://pypi.python.org/pypi/asyncio-sse/0.1它使用asyncio。

而且我也複製了一個使用asyncio的telnet服務器。

兩者都單獨工作,但我不知道如何將兩者結合在一起。據我瞭解,我需要在Telnet.data_receivedSSEHandler類中調用send(),但我不知道如何訪問它。這兩個「服務器」都需要在一個循環中運行,以接受新的連接或推送數據。

任何人都可以幫忙,或指向另一個方向嗎?我處於我不知道谷歌的階段

import asyncio 
import sse 

# Get an instance of the asyncio event loop 
loop = asyncio.get_event_loop() 

# Setup SSE address and port 
sse_host, sse_port = '192.168.2.25', 8888 

class Telnet(asyncio.Protocol): 
    def connection_made(self, transport): 
     print("Connection received!"); 
     self.transport = transport 

    def data_received(self, data): 
     print(data) 
     self.transport.write(b'echo:') 
     self.transport.write(data) 

     # This is where I want to send data via SSE 
     # SSEHandler.send(data) 

     # Things I've tried :(
     #loop.call_soon_threadsafe(SSEHandler.handle_request()); 
     #loop.call_soon_threadsafe(sse_server.send("PAH!")); 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     telnet_server.close() 

class SSEHandler(sse.Handler): 
    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 

# SSE server 
sse_server = sse.serve(SSEHandler, sse_host, sse_port) 

# Telnet server 
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '192.168.2.25', 7777)) 

#telnet_server.something = sse_server; 

loop.run_until_complete(sse_server) 
loop.run_until_complete(telnet_server.wait_closed()) 

回答

4

服務器端事件是一種http協議;並且您可能在任何特定時刻都有任何併發​​http請求在運行,如果沒有人連接或幾十個,您可能有零個。這種細微差別全部包含在兩個sse.servesse.Handler構造中;前者代表一個單一的偵聽端口,將每個單獨的客戶請求分派給後者。

此外,sse.Handler.handle_request()爲每個客戶端調用一次,並且一旦該協程序終止,客戶端就會斷開連接。在你的代碼中,該協程立即終止,所以客戶端看到一個「工作」事件。所以,我們需要等待,或多或少永遠。我們可以通過yield from來做到這一點asyncio.Future()

第二個問題是,我們需要以某種方式獲得對SSEHandler()的所有單獨實例的控制權,並在其中每個方法上使用send()方法。那麼,我們可以讓他們自己註冊他們的handle_request()方法;通過將每個處理程序實例添加到一個將它們映射到它們正在等待的未來的字典中。

class SSEHandler(sse.Handler): 
    _instances = {} 

    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 
     my_future = asyncio.Future() 
     SSEHandler._instances[self] = my_future 
     yield from my_future 

現在,對事件發送到每一個聆聽我們剛剛訪問所有在我們創建的字典中註冊的SSEHandler實例和對每一個使用send()

class SSEHandler(sse.Handler): 

    #... 

    @classmethod 
    def broadcast(cls, message): 
     for instance, future in cls._instances.items(): 
      instance.send(message) 

class Telnet(asyncio.Protocol): 

    #... 

    def data_received(self, data): 
     #... 
     SSEHandler.broadcast(data.decode('ascii')) 

最後,當telnet連接關閉時,您的代碼將退出。這很好,但我們當時也應該清理。幸運的是,這只是一個爲所有的處理程序

class SSEHandler(sse.Handler): 

    #... 

    @classmethod 
    def abort(cls): 
     for instance, future in cls._instances.items(): 
      future.set_result(None) 
     cls._instances = {} 

class Telnet(asyncio.Protocol): 

    #... 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     SSEHandler.abort() 
     telnet_server.close() 

的所有期貨的設定結果的事這裏是萬一一個完整的,工作轉儲我的例證並不明顯。

import asyncio 
import sse 

loop = asyncio.get_event_loop() 
sse_host, sse_port = '0.0.0.0', 8888 

class Telnet(asyncio.Protocol): 
    def connection_made(self, transport): 
     print("Connection received!"); 
     self.transport = transport 

    def data_received(self, data): 
     SSEHandler.broadcast(data.decode('ascii')) 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     SSEHandler.abort() 
     telnet_server.close() 

class SSEHandler(sse.Handler): 
    _instances = {} 
    @classmethod 
    def broadcast(cls, message): 
     for instance, future in cls._instances.items(): 
      instance.send(message) 

    @classmethod 
    def abort(cls): 
     for instance, future in cls._instances.items(): 
      future.set_result(None) 
     cls._instances = {} 

    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 
     my_future = asyncio.Future() 
     SSEHandler._instances[self] = my_future 
     yield from my_future 

sse_server = sse.serve(SSEHandler, sse_host, sse_port) 
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '0.0.0.0', 7777)) 
loop.run_until_complete(sse_server) 
loop.run_until_complete(telnet_server.wait_closed()) 
+0

非常感謝你解釋如何和爲什麼,非常非常有用。看起來我還有很多東西需要學習。 我得到你的示例代碼運行,它正在做我想要/需要的,和非常低的CPU。再次感謝 – 2014-09-02 15:25:43