如何使用autobahn asyncio实现交互式websocket客户端?
我试图实现一个使用高速公路| python 和asyncio
的websocket/wamp客户端,虽然它有点工作,但也有一些零件不知道 。如何使用autobahn asyncio实现交互式websocket客户端?
我真正想要做的是在qt5/QML中实现WAMP,但这个 看起来像是一个更容易的路径。
这个简化的客户端大部分都是从网上复制过来的。它在onJoin
发生时读取 时间服务。
我想要做的就是触发这个从外部来源读取。
我采取的复杂方法是在 线程中运行asyncio
事件循环,然后通过套接字发送命令来触发读取。 I 到目前为止还无法确定在哪里放置例程/协程,以便可以从读者例行程序中找到它。
我怀疑有一个更简单的方法去做这件事,但我还没有发现它 呢。欢迎提出建议。
#!/usr/bin/python3
try:
import asyncio
except ImportError:
## Trollius >= 0.3 was renamed
import trollius as asyncio
from autobahn.asyncio import wamp, websocket
import threading
import time
from socket import socketpair
rsock, wsock = socketpair()
def reader() :
data = rsock.recv(100)
print("Received:", data.decode())
class MyFrontendComponent(wamp.ApplicationSession):
def onConnect(self):
self.join(u"realm1")
@asyncio.coroutine
def onJoin(self, details):
print('joined')
## call a remote procedure
##
try:
now = yield from self.call(u'com.timeservice.now')
except Exception as e:
print("Error: {}".format(e))
else:
print("Current time from time service: {}".format(now))
def onLeave(self, details):
self.disconnect()
def onDisconnect(self):
asyncio.get_event_loop().stop()
def start_aloop() :
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
transport_factory = websocket.WampWebSocketClientFactory(session_factory,
debug = False,
debug_wamp = False)
coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
loop.add_reader(rsock,reader)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
if __name__ == '__main__':
session_factory = wamp.ApplicationSessionFactory()
session_factory.session = MyFrontendComponent
## 4) now enter the asyncio event loop
print('starting thread')
thread = threading.Thread(target=start_aloop)
thread.start()
time.sleep(5)
print("IN MAIN")
# emulate an outside call
wsock.send(b'a byte string')
您可以监听事件循环内的插座异步,使用loop.sock_accept
。你可以只调用一个协同程序设置的onConnect
或onJoin
插座内:
try:
import asyncio
except ImportError:
## Trollius >= 0.3 was renamed
import trollius as asyncio
from autobahn.asyncio import wamp, websocket
import socket
class MyFrontendComponent(wamp.ApplicationSession):
def onConnect(self):
self.join(u"realm1")
@asyncio.coroutine
def setup_socket(self):
# Create a non-blocking socket
self.sock = socket.socket()
self.sock.setblocking(0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(('localhost', 8889))
self.sock.listen(5)
loop = asyncio.get_event_loop()
# Wait for connections to come in. When one arrives,
# call the time service and disconnect immediately.
while True:
conn, address = yield from loop.sock_accept(self.sock)
yield from self.call_timeservice()
conn.close()
@asyncio.coroutine
def onJoin(self, details):
print('joined')
# Setup our socket server
asyncio.async(self.setup_socket())
## call a remote procedure
##
yield from self.call_timeservice()
@asyncio.coroutine
def call_timeservice(self):
try:
now = yield from self.call(u'com.timeservice.now')
except Exception as e:
print("Error: {}".format(e))
else:
print("Current time from time service: {}".format(now))
... # The rest is the same
感谢您的答复端午。不完全是我需要的解决方案,但它指出了我正确的方向。是的,我希望客户端可以通过外部触发器远程调用RPC。
我来到了,让我通过为特定调用的字符串(虽然只有一个实现现在)
这就是我想出了以下,虽然我不知道如何优雅它是。
import asyncio
from autobahn.asyncio import wamp, websocket
import threading
import time
import socket
rsock, wsock = socket.socketpair()
class MyFrontendComponent(wamp.ApplicationSession):
def onConnect(self):
self.join(u"realm1")
@asyncio.coroutine
def setup_socket(self):
# Create a non-blocking socket
self.sock = rsock
self.sock.setblocking(0)
loop = asyncio.get_event_loop()
# Wait for connections to come in. When one arrives,
# call the time service and disconnect immediately.
while True:
rcmd = yield from loop.sock_recv(rsock,80)
yield from self.call_service(rcmd.decode())
@asyncio.coroutine
def onJoin(self, details):
# Setup our socket server
asyncio.async(self.setup_socket())
@asyncio.coroutine
def call_service(self,rcmd):
print(rcmd)
try:
now = yield from self.call(rcmd)
except Exception as e:
print("Error: {}".format(e))
else:
print("Current time from time service: {}".format(now))
def onLeave(self, details):
self.disconnect()
def onDisconnect(self):
asyncio.get_event_loop().stop()
def start_aloop() :
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
transport_factory = websocket.WampWebSocketClientFactory(session_factory,
debug = False,
debug_wamp = False)
coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
if __name__ == '__main__':
session_factory = wamp.ApplicationSessionFactory()
session_factory.session = MyFrontendComponent
## 4) now enter the asyncio event loop
print('starting thread')
thread = threading.Thread(target=start_aloop)
thread.start()
time.sleep(5)
wsock.send(b'com.timeservice.now')
time.sleep(5)
wsock.send(b'com.timeservice.now')
time.sleep(5)
wsock.send(b'com.timeservice.now')
您可以使用'asyncio.sleep'和'loop.socket_sendall'以更多'asyncio'友好的方式实现'wsock'。这样你就不需要后台线程。相反,您只需使用'asyncio.async'来调度协程,该调用称为'asyncio.sleep(5)yield',接着是'loop.sock_sendall(wsock,b'com.timeservice.now')yield'。 – dano 2014-11-03 19:30:00
谢谢,我会继续研究这个,但我只用了几天,但还没有完全达到速度。我将循环放在后台,因为这些代码在pyrorside中使用,前台函数由QML代码调用。可能有更好的方法来做到这一点,我怀疑它会及时变得更清晰。 – morris 2014-11-03 20:23:13
啊,如果你打算把这个代码集成到一个不能插入'asyncio'事件循环的应用程序中,那么你在这里得到的是正确的想法。 – dano 2014-11-03 20:26:22
因此,您希望能够通过某种外部方式触发客户端对timeservice进行RPC调用? – dano 2014-11-03 02:26:17