Python3 ASYNCIO创建从一个初始

Python3 ASYNCIO创建从一个初始

问题描述:

我试图添加两个协程到ASYNCIO环,却得到了错误的辅助连接:Python3 ASYNCIO创建从一个初始

RuntimeError: This event loop is already running 

我的目标是要传达给服务器(即我没有控制)。该服务器需要来自客户端的初始连接。然后服务器在此连接上为客户端提供一个端口。客户端必须使用此端口创建第二个连接。服务器使用此第二个连接向客户端发送未经请求的消息。第一个连接始终保持其他双向通信。

要重新创建这种情况下,我有一些代码,再现错误:发生ServerConnection.setup_connection

class Connection(): 
    def __init__(self, ip, port, ioloop): 
     self.ip = ip 
     self.port = port 
     self.ioloop = ioloop 
     self.reader, self.writer = None, None 
     self.protocol = None 
     self.fileno = None 

    async def __aenter__(self): 
     # Applicable when doing 'with Connection(...' 
     log.info("Entering and Creating Connection") 
     self.reader, self.writer = (
      await asyncio.open_connection(self.ip, self.port, loop=self.ioloop) 
     ) 
     self.protocol = self.writer.transport.get_protocol() 
     self.fileno = self.writer.transport.get_extra_info('socket').fileno() 

     log.info(f"Created connection {self}") 
     return self 

    async def __aexit__(self, *args): 
     # Applicable when doing 'with Connection(...' 
     log.info(f"Exiting and Destroying Connection {self}") 
     if self.writer: 
      self.writer.close() 

    def __await__(self): 
     # Applicable when doing 'await Connection(...' 
     return self.__aenter__().__await__() 

    def __repr__(self): 
     return f"[Connection {self.ip}:{self.port}, {self.protocol}, fd={self.fileno}]" 

    async def send_recv_message(self, message): 
     log.debug(f"send: '{message}'") 
     self.writer.write(message.encode()) 
     await self.writer.drain() 

     log.debug("awaiting data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv: '{data}'") 
     return data 


class ServerConnection(Connection): 
    async def setup_connection(self): 
     event_port = 8889 # Assume this came from the server 
     print("In setup connection") 
     event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop) 
     self.ioloop.run_until_complete(event_connection.recv_message()) 

class EventConnection(Connection): 
    async def recv_message(self): 
     log.debug("awaiting recv-only data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv only: '{data}'") 
     return data 


async def main(loop): 
    client1 = await ServerConnection('127.0.0.1', 8888, loop) 
    await client1.setup_connection() 
    await client1.send_recv_message("Hello1") 
    await client1.send_recv_message("Hello2") 
    await asyncio.sleep(5) 

if __name__ == '__main__': 
    #logging.basicConfig(level=logging.INFO) 
    logging.basicConfig(level=logging.DEBUG) 
    log = logging.getLogger() 
    ioloop = asyncio.get_event_loop() 
    print('starting loop') 
    ioloop.run_until_complete(main(ioloop)) 
    print('completed loop') 
    ioloop.close() 

错误的位置run_until_complete被调用()方法。

由于缺乏对asyncio的理解,我可能做错了什么。基本上,我如何设置一个辅助连接,在建立第一个连接时获得事件通知(未经请求)?

谢谢。

跟进

由于代码非常相似(一些变更,添加更多功能的话),我希望这不是坏的礼仪随动原来的岗位作为所产生的误差仍然是相同的。

新问题是,当它收到未经请求的消息(由EventConnection接收)时,recv_message调用process_data方法。我想使process_data成为未来,以便recv_message完成(ioloop应该停止)。然后ensure_future会选择它并继续再次运行以使用ServerConnection对服务器执行请求/响应。然而,在它之前,它必须去一些用户代码(由external_command()表示,并且我更愿意隐藏异步内容)。这将使它再次同步。因此,一旦他们完成了所需的工作,他们应该在ServerConnection上调用execute_command,然后再次启动循环。

问题是,我对使用ensure_future的期望没有平移,因为它看起来循环没有停止运行。因此,当代码执行达到执行run_until_complete的execute_command时,会发生错误“此事件循环已在运行”的异常。

我有两个问题:

  1. 我怎样才能让这个后process_data被 放入ensure_future的ioloop可以停止,随后能够再次运行在execute_command ?

  2. 一旦recv_message收到了一些东西,我们如何使它能够让 它可以接收更多未经请求的数据?只需使用 ensure_future再次调用自己是否足够/安全?

下面是模拟此问题的示例代码。

client1 = None 

class ServerConnection(Connection): 
    connection_type = 'Server Connection' 
    async def setup_connection(self): 
     event_port = 8889 # Assume this came from the server 
     print("In setup connection") 
     event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop) 
     asyncio.ensure_future(event_connection.recv_message()) 

    async def _execute_command(self, data): 
     return await self.send_recv_message(data) 

    def execute_command(self, data): 
     response_str = self.ioloop.run_until_complete(self._execute_command(data)) 
     print(f"exec cmd response_str: {response_str}") 

    def external_command(self, data): 
     self.execute_command(data) 


class EventConnection(Connection): 
    connection_type = 'Event Connection' 
    async def recv_message(self): 
     global client1 
     log.debug("awaiting recv-only data...") 
     data = await self.reader.read(9999) 
     data = data.decode() 
     log.debug(f"recv-only: '{data}'") 
     asyncio.ensure_future(self.process_data(data)) 
     asyncio.ensure_future(self.recv_message()) 

    async def process_data(self, data): 
     global client1 
     await client1.external_command(data) 


async def main(ioloop): 
    global client1 
    client1 = await ServerConnection('127.0.0.1', 8888, ioloop) 
    await client1.setup_connection() 
    print(f"after connection setup loop running is {ioloop.is_running()}") 
    await client1.send_recv_message("Hello1") 
    print(f"after Hello1 loop running is {ioloop.is_running()}") 
    await client1.send_recv_message("Hello2") 
    print(f"after Hello2 loop running is {ioloop.is_running()}") 
    while True: 
     print(f"inside while loop running is {ioloop.is_running()}") 
     t = 10 
     print(f"asyncio sleep {t} sec") 
     await asyncio.sleep(t) 


if __name__ == '__main__': 
    logging.basicConfig(level=logging.DEBUG) 
    log = logging.getLogger() 
    ioloop = asyncio.get_event_loop() 
    print('starting loop') 
    ioloop.run_until_complete(main(ioloop)) 
    print('completed loop') 
    ioloop.close() 
+0

您要创建在同一时间2连接?为什么不使用'ayncio.gather'?用这种方法,你可以启动2个异步操作... –

+0

asyncio.gather似乎不适用于我的情况,原因有两个。其一,它应该按照上市顺序收集结果,另一方面它似乎想把所有的未来都列入清单。就我而言,我希望现在通过一个未来,并在第一个未来收到端口后的第二个未来(第二个连接)。 – bhairav

尝试更换:

self.ioloop.run_until_complete 

随着

await 
+0

我曾试过,但它会在等待EventConnection recv_message方法时挂起。我实际上用asyncio.ensure_future替换了它,这似乎已经完成了这个诀窍。然而,我仍然遇到了另一个复杂的层面,我又遇到了类似的问题(我一直在慢慢构建这个框架以满足我的需要)。将其添加为原始问题的更新。 – bhairav