Python asyncio:未引用的任务被垃圾收集器销毁?

Python asyncio:未引用的任务被垃圾收集器销毁?

问题描述:

我正在编写一个程序,接受通过AMQP执行网络请求(CoAP)的RPC请求。处理RPC请求时,aioamqp回调会生成负责网络IO的任务。这些任务可以被认为是后台任务,它将无限期地运行在AMQP上传输网络响应(在这种情况下,一个RPC请求触发RPC响应和数据流)。Python asyncio:未引用的任务被垃圾收集器销毁?

我注意到在我的原始代码中,网络任务会在看似随机的时间间隔(在完成之前)被破坏,asyncio会打印下面的警告“任务已被销毁但正在等待处理”。此问题与此处所述的问题类似:https://bugs.python.org/issue21163

现在我已经通过在模块级别的列表中存储一个硬引用来绕过这个问题,这可以防止GC破坏任务对象。但是,我想知道是否有更好的解决方法?理想情况下,我想在RPC回调中调用await任务,但我注意到这阻止了任何进一步的AMQP操作完成 - >例如创建一个新的amqp渠道摊位并通过amqp接收rpc请求也会停止。然而,我不确定是什么导致了这种拖延(因为回调本身就是一个协程,我希望等待不会拖延整个aioamqp库)。

我发布的RPC客户端和服务器的源代码都是基于aioamqp/aiocoap示例。在服务器中,on_rpc_request是AMQP RPC回调send_coap_obs_request是当“obs_tasks.append(任务)”语句删除了被破坏了网络协同程序。

client.py:

""" 
    CoAP RPC client, based on aioamqp implementation of RPC examples from RabbitMQ tutorial 
""" 

import base64 
import json 
import uuid 

import asyncio 
import aioamqp 


class CoAPRpcClient(object): 
    def __init__(self): 
     self.transport = None 
     self.protocol = None 
     self.channel = None 
     self.callback_queue = None 
     self.waiter = asyncio.Event() 

    async def connect(self): 
     """ an `__init__` method can't be a coroutine""" 
     self.transport, self.protocol = await aioamqp.connect() 
     self.channel = await self.protocol.channel() 

     result = await self.channel.queue_declare(queue_name='', exclusive=True) 
     self.callback_queue = result['queue'] 

     await self.channel.basic_consume(
      self.on_response, 
      no_ack=True, 
      queue_name=self.callback_queue, 
     ) 

    async def on_response(self, channel, body, envelope, properties): 
     if self.corr_id == properties.correlation_id: 
      self.response = body 

     self.waiter.set() 

    async def call(self, n): 
     if not self.protocol: 
      await self.connect() 
     self.response = None 
     self.corr_id = str(uuid.uuid4()) 
     await self.channel.basic_publish(
      payload=str(n), 
      exchange_name='', 
      routing_key='coap_request_rpc_queue', 
      properties={ 
       'reply_to': self.callback_queue, 
       'correlation_id': self.corr_id, 
      }, 
     ) 
     await self.waiter.wait() 

     await self.protocol.close() 
     return json.loads(self.response) 


async def rpc_client(): 
    coap_rpc = CoAPRpcClient() 

    request_dict = {} 
    request_dict_json = json.dumps(request_dict) 

    print(" [x] Send RPC coap_request({})".format(request_dict_json)) 
    response_dict = await coap_rpc.call(request_dict_json) 
    print(" [.] Got {}".format(response_dict)) 


asyncio.get_event_loop().run_until_complete(rpc_client()) 

server.py:

""" 
CoAP RPC server, based on aioamqp implementation of RPC examples from RabbitMQ tutorial 
""" 

import base64 
import json 
import sys 

import logging 
import warnings 

import asyncio 
import aioamqp 
import aiocoap 

amqp_protocol = None 
coap_client_context = None 
obs_tasks = [] 

AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME = 'topic_coap' 
AMQP_COAP_NOTIFICATIONS_TOPIC_NAME = 'topic' 
AMQP_COAP_NOTIFICATIONS_ROUTING_KEY = 'coap.response' 

def create_response_dict(coap_request, coap_response): 
    response_dict = {'request_uri': "", 'code': 0} 
    response_dict['request_uri'] = coap_request.get_request_uri() 
    response_dict['code'] = coap_response.code 

    if len(coap_response.payload) > 0: 
     response_dict['payload'] = base64.b64encode(coap_response.payload).decode('utf-8') 

    return response_dict 


async def handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response): 
    # create response dict: 
    response_dict = create_response_dict(coap_request, coap_response) 
    message = json.dumps(response_dict) 

    # create new channel: 
    global amqp_protocol 
    amqp_channel = await amqp_protocol.channel() 

    await amqp_channel.basic_publish(
     payload=message, 
     exchange_name='', 
     routing_key=amqp_properties.reply_to, 
     properties={ 
      'correlation_id': amqp_properties.correlation_id, 
     }, 
    ) 

    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag) 

    print(" [.] handle_coap_response() published response: {}".format(response_dict)) 


def incoming_observation(coap_request, coap_response): 
    asyncio.async(handle_coap_notification(coap_request, coap_response)) 


async def handle_coap_notification(coap_request, coap_response): 
    # create response dict: 
    response_dict = create_response_dict(coap_request, coap_response) 
    message = json.dumps(response_dict) 

    # create new channel: 
    global amqp_protocol 
    amqp_channel = await amqp_protocol.channel() 

    await amqp_channel.exchange(AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, AMQP_COAP_NOTIFICATIONS_TOPIC_NAME) 

    await amqp_channel.publish(message, exchange_name=AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, routing_key=AMQP_COAP_NOTIFICATIONS_ROUTING_KEY) 

    print(" [.] handle_coap_notification() published response: {}".format(response_dict)) 


async def send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request): 
    observation_is_over = asyncio.Future() 
    try: 
     global coap_client_context 
     requester = coap_client_context.request(coap_request) 
     requester.observation.register_errback(observation_is_over.set_result) 
     requester.observation.register_callback(lambda data, coap_request=coap_request: incoming_observation(coap_request, data)) 

     try: 
      print(" [..] Sending CoAP obs request: {}".format(request_dict)) 
      coap_response = await requester.response 
     except socket.gaierror as e: 
      print("Name resolution error:", e, file=sys.stderr) 
      return 
     except OSError as e: 
      print("Error:", e, file=sys.stderr) 
      return 

     if coap_response.code.is_successful(): 
      print(" [..] Received CoAP response: {}".format(coap_response)) 
      await handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response) 
     else: 
      print(coap_response.code, file=sys.stderr) 
      if coap_response.payload: 
       print(coap_response.payload.decode('utf-8'), file=sys.stderr) 
      sys.exit(1) 

     exit_reason = await observation_is_over 
     print("Observation is over: %r"%(exit_reason,), file=sys.stderr) 

    finally: 
     if not requester.response.done(): 
      requester.response.cancel() 
     if not requester.observation.cancelled: 
      requester.observation.cancel() 


async def on_rpc_request(amqp_channel, amqp_body, amqp_envelope, amqp_properties): 
    print(" [.] on_rpc_request(): received RPC request: {}".format(amqp_body)) 

    request_dict = {} # hardcoded to vdna.be for SO example 
    aiocoap_code = aiocoap.GET 
    aiocoap_uri = "coap://vdna.be/obs" 
    aiocoap_payload = "" 

    # as we are ready to send the CoAP request, ack the client already indicating we have received the RPC request 
    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag) 

    coap_request = aiocoap.Message(code=aiocoap_code, uri=aiocoap_uri, payload=aiocoap_payload) 
    coap_request.opt.observe = 0 

    task = asyncio.ensure_future(send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request)) 
    # we have to keep a hard ref to this task, otherwise the python garbage collector destroyes the task before it is completed. See https://bugs.python.org/issue21163 
    # this is apparent from the "Task was destroyed but it is pending" exception thrown after random (lengthy) time intervals, probably the time interval is related to when the gc is triggered 
    # await task # this does not seem to work, as it prevents new amqp operations from executing (e.g. amqp channels do not get created) 
    # we are actually not interested in waiting for the task anyway, so instead just keep a hard ref to the task in the obs_tasks list 
    obs_tasks.append(task) # TODO: when do we remove the task from the list? 


async def amqp_connect(): 
    try: 
     (transport, protocol) = await aioamqp.connect('localhost', 5672) 
     print(" [x] Connected to AMQP broker") 
     return (transport, protocol) 
    except aioamqp.AmqpClosedConnection as ex: 
     print("closed connections: {}".format(ex)) 
     raise ex 


async def main(): 
    """Open AMQP connection to broker, subscribe to coap_request_rpc_queue and setup aiocoap client context """ 

    try: 
     global amqp_protocol 
     (amqp_transport, amqp_protocol) = await amqp_connect() 

     channel = await amqp_protocol.channel() 

     await channel.queue_declare(queue_name='coap_request_rpc_queue') 
     await channel.basic_qos(prefetch_count=10, prefetch_size=0, connection_global=False) 
     await channel.basic_consume(on_rpc_request, queue_name='coap_request_rpc_queue') 

     print(" [x] Awaiting CoAP request RPC requests") 
    except aioamqp.AmqpClosedConnection as ex: 
     print("amqp_connect: closed connections: {}".format(ex)) 
     exit() 

    global coap_client_context 
    coap_client_context = await aiocoap.Context.create_client_context() 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 

    loop.set_debug(True) 

    asyncio.async(main()) 
    loop.run_forever() 

当一个计划任务,它的_step回调计划在循环。该回调通过self保持对任务的引用。我没有检查过这些代码,但我非常有信心这个循环保持对它的回调的引用。但是,当任务等待某个等待时间或将来时,_step回调没有安排。在这种情况下,任务会添加一个完成的回调,该回调保留对任务的引用,但循环不保留对等待期货的任务的引用。

只要某件事保留了任务等待的未来的参考,一切都很好。然而,如果没有什么能保留对未来的坚实参考,那么未来可以收集垃圾,当发生这种情况时,任务可以收集垃圾。

所以,我会寻找你的任务调用的任务在将来任务正在等待的事情可能不会被引用。 总的来说,未来需要被引用,所以最终有人可以设定它的结果,所以如果你有未参考的未来,这很可能是一个错误。