2015-06-19 63 views
7

我試圖讓我的頭繞着Python 3 asyncio模塊,特別是使用傳輸/協議API。我想創建一個發佈/訂閱模式,並使用asyncio.Protocol類創建我的客戶端和服務器。使用隊列的Asyncio persisent客戶端協議類

目前,我已啓動並運行服務器,並偵聽傳入的客戶端連接。客戶端能夠連接到服務器,發送消息並接收答覆。

我希望能夠保持TCP連接處於活動狀態,並維護一個允許我添加消息的隊列。我試圖找到一種方法來使用低級API(Transport/Protocols)來執行此操作,但在線有限的asyncio文檔/示例似乎都進入了高級API - 使用流等。有人能夠請告訴我如何實施這個方向的正確方向?

這裏的服務器代碼:

#!/usr/bin/env python3 

import asyncio 
import json 


class SubscriberServerProtocol(asyncio.Protocol): 
    """ A Server Protocol listening for subscriber messages """ 

    def connection_made(self, transport): 
     """ Called when connection is initiated """ 

     self.peername = transport.get_extra_info('peername') 
     print('connection from {}'.format(self.peername)) 
     self.transport = transport 

    def data_received(self, data): 
     """ The protocol expects a json message containing 
     the following fields: 

      type:  subscribe/unsubscribe 
      channel: the name of the channel 

     Upon receiving a valid message the protocol registers 
     the client with the pubsub hub. When succesfully registered 
     we return the following json message: 

      type:   subscribe/unsubscribe/unknown 
      channel:  The channel the subscriber registered to 
      channel_count: the amount of channels registered 
     """ 

     # Receive a message and decode the json output 
     recv_message = json.loads(data.decode()) 

     # Check the message type and subscribe/unsubscribe 
     # to the channel. If the action was succesful inform 
     # the client. 
     if recv_message['type'] == 'subscribe': 
      print('Client {} subscribed to {}'.format(self.peername, 
                 recv_message['channel'])) 
      send_message = json.dumps({'type': 'subscribe', 
             'channel': recv_message['channel'], 
             'channel_count': 10}, 
             separators=(',', ':')) 
     elif recv_message['type'] == 'unsubscribe': 
      print('Client {} unsubscribed from {}' 
        .format(self.peername, recv_message['channel'])) 
      send_message = json.dumps({'type': 'unsubscribe', 
             'channel': recv_message['channel'], 
             'channel_count': 9}, 
             separators=(',', ':')) 
     else: 
      print('Invalid message type {}'.format(recv_message['type'])) 
      send_message = json.dumps({'type': 'unknown_type'}, 
             separators=(',', ':')) 

     print('Sending {!r}'.format(send_message)) 
     self.transport.write(send_message.encode()) 

    def eof_received(self): 
     """ an EOF has been received from the client. 

     This indicates the client has gracefully exited 
     the connection. Inform the pubsub hub that the 
     subscriber is gone 
     """ 
     print('Client {} closed connection'.format(self.peername)) 
     self.transport.close() 

    def connection_lost(self, exc): 
     """ A transport error or EOF is seen which 
     means the client is disconnected. 

     Inform the pubsub hub that the subscriber has 
     Disappeared 
     """ 
     if exc: 
      print('{} {}'.format(exc, self.peername)) 


loop = asyncio.get_event_loop() 

# Each client will create a new protocol instance 
coro = loop.create_server(SubscriberServerProtocol, '127.0.0.1', 10666) 
server = loop.run_until_complete(coro) 

# Serve requests until Ctrl+C 
print('Serving on {}'.format(server.sockets[0].getsockname())) 
try: 
    loop.run_forever() 
except KeyboardInterrupt: 
    pass 

# Close the server 
try: 
    server.close() 
    loop.until_complete(server.wait_closed()) 
    loop.close() 
except: 
    pass 

而這裏的客戶端代碼:

#!/usr/bin/env python3 

import asyncio 
import json 


class SubscriberClientProtocol(asyncio.Protocol): 
    def __init__(self, message, loop): 
     self.message = message 
     self.loop = loop 

    def connection_made(self, transport): 
     """ Upon connection send the message to the 
     server 

     A message has to have the following items: 
      type:  subscribe/unsubscribe 
      channel: the name of the channel 
     """ 
     transport.write(self.message.encode()) 
     print('Message sent: {!r}'.format(self.message)) 

    def data_received(self, data): 
     """ After sending a message we expect a reply 
     back from the server 

     The return message consist of three fields: 
      type:   subscribe/unsubscribe 
      channel:  the name of the channel 
      channel_count: the amount of channels subscribed to 
     """ 
     print('Message received: {!r}'.format(data.decode())) 

    def connection_lost(self, exc): 
     print('The server closed the connection') 
     print('Stop the event loop') 
     self.loop.stop() 

if __name__ == '__main__': 
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'}, 
         separators=(',', ':')) 

    loop = asyncio.get_event_loop() 
    coro = loop.create_connection(lambda: SubscriberClientProtocol(message, 
                    loop), 
            '127.0.0.1', 10666) 
    loop.run_until_complete(coro) 
    try: 
     loop.run_forever() 
    except KeyboardInterrupt: 
     print('Closing connection') 
    loop.close() 

回答

9

你的服務器是罰款,是你想要做什麼;你寫的代碼實際上保持了TCP連接的活性,你只是沒有足夠的管道來連續地給它提供新的消息。要做到這一點,您需要調整客戶端代碼,以便您可以隨時向其中添加新消息,而不僅僅在connection_made回調觸發時執行此操作。

這很容易;我們會將內部asyncio.Queue添加到可以接收消息的ClientProtocol,然後在消耗來自Queue的消息的無限循環中運行協程,並將它們發送到服務器。最後一部分是實際存儲您從create_connection調用中獲得的ClientProtocol實例,然後將其傳遞給實際發送消息的協程。

import asyncio 
import json 

class SubscriberClientProtocol(asyncio.Protocol): 
    def __init__(self, loop): 
     self.transport = None 
     self.loop = loop 
     self.queue = asyncio.Queue() 
     self._ready = asyncio.Event() 
     asyncio.async(self._send_messages()) # Or asyncio.ensure_future if using 3.4.3+ 

    @asyncio.coroutine 
    def _send_messages(self): 
     """ Send messages to the server as they become available. """ 
     yield from self._ready.wait() 
     print("Ready!") 
     while True: 
      data = yield from self.queue.get() 
      self.transport.write(data.encode('utf-8')) 
      print('Message sent: {!r}'.format(message)) 

    def connection_made(self, transport): 
     """ Upon connection send the message to the 
     server 

     A message has to have the following items: 
      type:  subscribe/unsubscribe 
      channel: the name of the channel 
     """ 
     self.transport = transport 
     print("Connection made.") 
     self._ready.set() 

    @asyncio.coroutine 
    def send_message(self, data): 
     """ Feed a message to the sender coroutine. """ 
     yield from self.queue.put(data) 

    def data_received(self, data): 
     """ After sending a message we expect a reply 
     back from the server 

     The return message consist of three fields: 
      type:   subscribe/unsubscribe 
      channel:  the name of the channel 
      channel_count: the amount of channels subscribed to 
     """ 
     print('Message received: {!r}'.format(data.decode())) 

    def connection_lost(self, exc): 
     print('The server closed the connection') 
     print('Stop the event loop') 
     self.loop.stop() 

@asyncio.coroutine 
def feed_messages(protocol): 
    """ An example function that sends the same message repeatedly. """ 
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'}, 
         separators=(',', ':')) 
    while True: 
     yield from protocol.send_message(message) 
     yield from asyncio.sleep(1) 

if __name__ == '__main__': 
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'}, 
         separators=(',', ':')) 

    loop = asyncio.get_event_loop() 
    coro = loop.create_connection(lambda: SubscriberClientProtocol(loop), 
            '127.0.0.1', 10666) 
    _, proto = loop.run_until_complete(coro) 
    asyncio.async(feed_messages(proto)) # Or asyncio.ensure_future if using 3.4.3+ 
    try: 
     loop.run_forever() 
    except KeyboardInterrupt: 
     print('Closing connection') 
    loop.close() 
+0

感謝@dano的快速和徹底的答案。我將這些更改應用於我的代碼,它的功能就像一個魅力! – thiezn

+0

@dano非常感謝。這正是我所尋找的。 – user24502

相關問題