Python - asyncio协议/服务器之间的通信

问题描述:

我正在尝试编写一个服务器端事件服务器,我可以使用telnet连接到该服务器,并将telnet内容推送到浏览器。使用Python和asyncio的想法是使用尽可能小的CPU,因为它将在Raspberry Pi上运行。Python - asyncio协议/服务器之间的通信

到目前为止,我有以下使用在这里找到的库:https://pypi.python.org/pypi/asyncio-sse/0.1它使用asyncio。

而且我也复制了一个使用asyncio的telnet服务器。

两者都单独工作,但我不知道如何将两者结合在一起。据我了解,我需要在Telnet.data_receivedSSEHandler类中调用send(),但我不知道如何访问它。这两个“服务器”都需要在一个循环中运行,以接受新的连接或推送数据。

任何人都可以帮忙,或指向另一个方向吗?我处于我不知道谷歌的阶段

import asyncio 
import sse 

# Get an instance of the asyncio event loop 
loop = asyncio.get_event_loop() 

# Setup SSE address and port 
sse_host, sse_port = '192.168.2.25', 8888 

class Telnet(asyncio.Protocol): 
    def connection_made(self, transport): 
     print("Connection received!"); 
     self.transport = transport 

    def data_received(self, data): 
     print(data) 
     self.transport.write(b'echo:') 
     self.transport.write(data) 

     # This is where I want to send data via SSE 
     # SSEHandler.send(data) 

     # Things I've tried :(
     #loop.call_soon_threadsafe(SSEHandler.handle_request()); 
     #loop.call_soon_threadsafe(sse_server.send("PAH!")); 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     telnet_server.close() 

class SSEHandler(sse.Handler): 
    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 

# SSE server 
sse_server = sse.serve(SSEHandler, sse_host, sse_port) 

# Telnet server 
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '192.168.2.25', 7777)) 

#telnet_server.something = sse_server; 

loop.run_until_complete(sse_server) 
loop.run_until_complete(telnet_server.wait_closed()) 

服务器端事件是一种http协议;并且您可能在任何特定时刻都有任何并发​​http请求在运行,如果没有人连接或几十个,您可能有零个。这种细微差别全部包含在两个sse.servesse.Handler构造中;前者代表一个单一的侦听端口,将每个单独的客户请求分派给后者。

此外,sse.Handler.handle_request()为每个客户端调用一次,并且一旦该协程序终止,客户端就会断开连接。在你的代码中,该协程立即终止,所以客户端看到一个“工作”事件。所以,我们需要等待,或多或少永远。我们可以通过yield from来做到这一点asyncio.Future()

第二个问题是,我们需要以某种方式获得对SSEHandler()的所有单独实例的控制权,并在其中每个方法上使用send()方法。那么,我们可以让他们自己注册他们的handle_request()方法;通过将每个处理程序实例添加到一个将它们映射到它们正在等待的未来的字典中。

class SSEHandler(sse.Handler): 
    _instances = {} 

    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 
     my_future = asyncio.Future() 
     SSEHandler._instances[self] = my_future 
     yield from my_future 

现在,对事件发送到每一个聆听我们刚刚访问所有在我们创建的字典中注册的SSEHandler实例和对每一个使用send()

class SSEHandler(sse.Handler): 

    #... 

    @classmethod 
    def broadcast(cls, message): 
     for instance, future in cls._instances.items(): 
      instance.send(message) 

class Telnet(asyncio.Protocol): 

    #... 

    def data_received(self, data): 
     #... 
     SSEHandler.broadcast(data.decode('ascii')) 

最后,当telnet连接关闭时,您的代码将退出。这很好,但我们当时也应该清理。幸运的是,这只是一个为所有的处理程序

class SSEHandler(sse.Handler): 

    #... 

    @classmethod 
    def abort(cls): 
     for instance, future in cls._instances.items(): 
      future.set_result(None) 
     cls._instances = {} 

class Telnet(asyncio.Protocol): 

    #... 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     SSEHandler.abort() 
     telnet_server.close() 

的所有期货的设定结果的事这里是万一一个完整的,工作转储我的例证并不明显。

import asyncio 
import sse 

loop = asyncio.get_event_loop() 
sse_host, sse_port = '0.0.0.0', 8888 

class Telnet(asyncio.Protocol): 
    def connection_made(self, transport): 
     print("Connection received!"); 
     self.transport = transport 

    def data_received(self, data): 
     SSEHandler.broadcast(data.decode('ascii')) 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     SSEHandler.abort() 
     telnet_server.close() 

class SSEHandler(sse.Handler): 
    _instances = {} 
    @classmethod 
    def broadcast(cls, message): 
     for instance, future in cls._instances.items(): 
      instance.send(message) 

    @classmethod 
    def abort(cls): 
     for instance, future in cls._instances.items(): 
      future.set_result(None) 
     cls._instances = {} 

    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 
     my_future = asyncio.Future() 
     SSEHandler._instances[self] = my_future 
     yield from my_future 

sse_server = sse.serve(SSEHandler, sse_host, sse_port) 
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '0.0.0.0', 7777)) 
loop.run_until_complete(sse_server) 
loop.run_until_complete(telnet_server.wait_closed()) 
+0

非常感谢你解释如何和为什么,非常非常有用。看起来我还有很多东西需要学习。 我得到你的示例代码运行,它正在做我想要/需要的,和非常低的CPU。再次感谢 – 2014-09-02 15:25:43