2017-06-01 54 views
2

我正在編寫一個程序,接受通過AMQP執行網絡請求(CoAP)的RPC請求。處理RPC請求時,aioamqp回調會生成負責網絡IO的任務。這些任務可以被認爲是後臺任務,它將無限期地運行在AMQP上傳輸網絡響應(在這種情況下,一個RPC請求觸發RPC響應和數據流)。Python asyncio:未引用的任務被垃圾收集器銷燬?

我注意到在我的原始代碼中,網絡任務會在看似隨機的時間間隔(在完成之前)被破壞,asyncio會打印下面的警告「任務已被銷燬但正在等待處理」。此問題與此處所述的問題類似:https://bugs.python.org/issue21163

現在我已經通過在模塊級別的列表中存儲一個硬引用來繞過這個問題,這可以防止GC破壞任務對象。但是,我想知道是否有更好的解決方法?理想情況下,我想在RPC回調中調用await任務,但我注意到這阻止了任何進一步的AMQP操作完成 - >例如創建一個新的amqp渠道攤位並通過amqp接收rpc請求也會停止。然而,我不確定是什麼導致了這種拖延(因爲回調本身就是一個協程,我希望等待不會拖延整個aioamqp庫)。

我發佈的RPC客戶端和服務器的源代碼都是基於aioamqp/aiocoap示例。在服務器中,on_rpc_request是AMQP RPC回調send_coap_obs_request是當「obs_tasks.append(任務)」語句刪除了被破壞了網絡協同程序。

client.py:

""" 
    CoAP RPC client, based on aioamqp implementation of RPC examples from RabbitMQ tutorial 
""" 

import base64 
import json 
import uuid 

import asyncio 
import aioamqp 


class CoAPRpcClient(object): 
    def __init__(self): 
     self.transport = None 
     self.protocol = None 
     self.channel = None 
     self.callback_queue = None 
     self.waiter = asyncio.Event() 

    async def connect(self): 
     """ an `__init__` method can't be a coroutine""" 
     self.transport, self.protocol = await aioamqp.connect() 
     self.channel = await self.protocol.channel() 

     result = await self.channel.queue_declare(queue_name='', exclusive=True) 
     self.callback_queue = result['queue'] 

     await self.channel.basic_consume(
      self.on_response, 
      no_ack=True, 
      queue_name=self.callback_queue, 
     ) 

    async def on_response(self, channel, body, envelope, properties): 
     if self.corr_id == properties.correlation_id: 
      self.response = body 

     self.waiter.set() 

    async def call(self, n): 
     if not self.protocol: 
      await self.connect() 
     self.response = None 
     self.corr_id = str(uuid.uuid4()) 
     await self.channel.basic_publish(
      payload=str(n), 
      exchange_name='', 
      routing_key='coap_request_rpc_queue', 
      properties={ 
       'reply_to': self.callback_queue, 
       'correlation_id': self.corr_id, 
      }, 
     ) 
     await self.waiter.wait() 

     await self.protocol.close() 
     return json.loads(self.response) 


async def rpc_client(): 
    coap_rpc = CoAPRpcClient() 

    request_dict = {} 
    request_dict_json = json.dumps(request_dict) 

    print(" [x] Send RPC coap_request({})".format(request_dict_json)) 
    response_dict = await coap_rpc.call(request_dict_json) 
    print(" [.] Got {}".format(response_dict)) 


asyncio.get_event_loop().run_until_complete(rpc_client()) 

server.py:

""" 
CoAP RPC server, based on aioamqp implementation of RPC examples from RabbitMQ tutorial 
""" 

import base64 
import json 
import sys 

import logging 
import warnings 

import asyncio 
import aioamqp 
import aiocoap 

amqp_protocol = None 
coap_client_context = None 
obs_tasks = [] 

AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME = 'topic_coap' 
AMQP_COAP_NOTIFICATIONS_TOPIC_NAME = 'topic' 
AMQP_COAP_NOTIFICATIONS_ROUTING_KEY = 'coap.response' 

def create_response_dict(coap_request, coap_response): 
    response_dict = {'request_uri': "", 'code': 0} 
    response_dict['request_uri'] = coap_request.get_request_uri() 
    response_dict['code'] = coap_response.code 

    if len(coap_response.payload) > 0: 
     response_dict['payload'] = base64.b64encode(coap_response.payload).decode('utf-8') 

    return response_dict 


async def handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response): 
    # create response dict: 
    response_dict = create_response_dict(coap_request, coap_response) 
    message = json.dumps(response_dict) 

    # create new channel: 
    global amqp_protocol 
    amqp_channel = await amqp_protocol.channel() 

    await amqp_channel.basic_publish(
     payload=message, 
     exchange_name='', 
     routing_key=amqp_properties.reply_to, 
     properties={ 
      'correlation_id': amqp_properties.correlation_id, 
     }, 
    ) 

    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag) 

    print(" [.] handle_coap_response() published response: {}".format(response_dict)) 


def incoming_observation(coap_request, coap_response): 
    asyncio.async(handle_coap_notification(coap_request, coap_response)) 


async def handle_coap_notification(coap_request, coap_response): 
    # create response dict: 
    response_dict = create_response_dict(coap_request, coap_response) 
    message = json.dumps(response_dict) 

    # create new channel: 
    global amqp_protocol 
    amqp_channel = await amqp_protocol.channel() 

    await amqp_channel.exchange(AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, AMQP_COAP_NOTIFICATIONS_TOPIC_NAME) 

    await amqp_channel.publish(message, exchange_name=AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, routing_key=AMQP_COAP_NOTIFICATIONS_ROUTING_KEY) 

    print(" [.] handle_coap_notification() published response: {}".format(response_dict)) 


async def send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request): 
    observation_is_over = asyncio.Future() 
    try: 
     global coap_client_context 
     requester = coap_client_context.request(coap_request) 
     requester.observation.register_errback(observation_is_over.set_result) 
     requester.observation.register_callback(lambda data, coap_request=coap_request: incoming_observation(coap_request, data)) 

     try: 
      print(" [..] Sending CoAP obs request: {}".format(request_dict)) 
      coap_response = await requester.response 
     except socket.gaierror as e: 
      print("Name resolution error:", e, file=sys.stderr) 
      return 
     except OSError as e: 
      print("Error:", e, file=sys.stderr) 
      return 

     if coap_response.code.is_successful(): 
      print(" [..] Received CoAP response: {}".format(coap_response)) 
      await handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response) 
     else: 
      print(coap_response.code, file=sys.stderr) 
      if coap_response.payload: 
       print(coap_response.payload.decode('utf-8'), file=sys.stderr) 
      sys.exit(1) 

     exit_reason = await observation_is_over 
     print("Observation is over: %r"%(exit_reason,), file=sys.stderr) 

    finally: 
     if not requester.response.done(): 
      requester.response.cancel() 
     if not requester.observation.cancelled: 
      requester.observation.cancel() 


async def on_rpc_request(amqp_channel, amqp_body, amqp_envelope, amqp_properties): 
    print(" [.] on_rpc_request(): received RPC request: {}".format(amqp_body)) 

    request_dict = {} # hardcoded to vdna.be for SO example 
    aiocoap_code = aiocoap.GET 
    aiocoap_uri = "coap://vdna.be/obs" 
    aiocoap_payload = "" 

    # as we are ready to send the CoAP request, ack the client already indicating we have received the RPC request 
    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag) 

    coap_request = aiocoap.Message(code=aiocoap_code, uri=aiocoap_uri, payload=aiocoap_payload) 
    coap_request.opt.observe = 0 

    task = asyncio.ensure_future(send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request)) 
    # we have to keep a hard ref to this task, otherwise the python garbage collector destroyes the task before it is completed. See https://bugs.python.org/issue21163 
    # this is apparent from the "Task was destroyed but it is pending" exception thrown after random (lengthy) time intervals, probably the time interval is related to when the gc is triggered 
    # await task # this does not seem to work, as it prevents new amqp operations from executing (e.g. amqp channels do not get created) 
    # we are actually not interested in waiting for the task anyway, so instead just keep a hard ref to the task in the obs_tasks list 
    obs_tasks.append(task) # TODO: when do we remove the task from the list? 


async def amqp_connect(): 
    try: 
     (transport, protocol) = await aioamqp.connect('localhost', 5672) 
     print(" [x] Connected to AMQP broker") 
     return (transport, protocol) 
    except aioamqp.AmqpClosedConnection as ex: 
     print("closed connections: {}".format(ex)) 
     raise ex 


async def main(): 
    """Open AMQP connection to broker, subscribe to coap_request_rpc_queue and setup aiocoap client context """ 

    try: 
     global amqp_protocol 
     (amqp_transport, amqp_protocol) = await amqp_connect() 

     channel = await amqp_protocol.channel() 

     await channel.queue_declare(queue_name='coap_request_rpc_queue') 
     await channel.basic_qos(prefetch_count=10, prefetch_size=0, connection_global=False) 
     await channel.basic_consume(on_rpc_request, queue_name='coap_request_rpc_queue') 

     print(" [x] Awaiting CoAP request RPC requests") 
    except aioamqp.AmqpClosedConnection as ex: 
     print("amqp_connect: closed connections: {}".format(ex)) 
     exit() 

    global coap_client_context 
    coap_client_context = await aiocoap.Context.create_client_context() 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 

    loop.set_debug(True) 

    asyncio.async(main()) 
    loop.run_forever() 

回答

0

當一個計劃任務,它的_step回調計劃在循環。該回調通過self保持對任務的引用。我沒有檢查過這些代碼,但我非常有信心這個循環保持對它的回調的引用。但是,當任務等待某個等待時間或將來時,_step回調沒有安排。在這種情況下,任務會添加一個完成的回調,該回調保留對任務的引用,但循環不保留對等待期貨的任務的引用。

只要某件事保留了任務等待的未來的參考,一切都很好。然而,如果沒有什麼能保留對未來的堅實參考,那麼未來可以收集垃圾,當發生這種情況時,任務可以收集垃圾。

所以,我會尋找你的任務調用的任務在將來任務正在等待的事情可能不會被引用。 總的來說,未來需要被引用,所以最終有人可以設定它的結果,所以如果你有未參考的未來,這很可能是一個錯誤。