websockets,asyncio和PyQt5在一起。 Quamash是必要的吗?
问题描述:
我一直在使用PyQt5和围绕asyncio构建的websockets模块的客户端。我认为像下面的代码可以工作,但我发现传入的数据(从服务器)没有在GUI中更新,直到我点击输入行编辑框中。这些传入的消息旨在为GUI的更新设置脉冲,并将携带用于更新的数据。 quamash是更好的方法来处理这个问题吗?顺便说一句,我将使用这个代码的其他方面的进程,所以我不认为它是过度杀伤(在这一点上)。 这是Python 3.6,PyQt5.6(或更高版本)以及当前使用pip安装的任何websockets版本。 https://github.com/aaugustin/websocketswebsockets,asyncio和PyQt5在一起。 Quamash是必要的吗?
客户端:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import websockets
import sys
import time
from multiprocessing import Process, Pipe, Queue
from PyQt5 import QtCore, QtGui, QtWidgets
class ComBox(QtWidgets.QDialog):
def __init__(self):
QtWidgets.QDialog.__init__(self)
self.verticalLayout = QtWidgets.QVBoxLayout(self)
self.groupBox = QtWidgets.QGroupBox(self)
self.groupBox.setTitle("messages from beyond")
self.gridLayout = QtWidgets.QGridLayout(self.groupBox)
self.label = QtWidgets.QLabel(self.groupBox)
self.gridLayout.addWidget(self.label, 0, 0, 1, 1)
self.verticalLayout.addWidget(self.groupBox)
self.lineEdit = QtWidgets.QLineEdit(self)
self.verticalLayout.addWidget(self.lineEdit)
self.lineEdit.editingFinished.connect(self.enterPress)
@QtCore.pyqtSlot()
def enterPress(self):
mytext = str(self.lineEdit.text())
self.inputqueue.put(mytext)
@QtCore.pyqtSlot(str)
def updategui(self, message):
self.label.setText(message)
class Websocky(QtCore.QThread):
updatemaingui = QtCore.pyqtSignal(str)
def __init__(self):
super(Websocky, self).__init__()
def run(self):
while True:
time.sleep(.1)
message = self.outputqueue.get()
try:
self.updatemaingui[str].emit(message)
except Exception as e1:
print("updatemaingui problem: {}".format(e1))
async def consumer_handler(websocket):
while True:
try:
message = await websocket.recv()
outputqueue.put(message)
except Exception as e1:
print(e1)
async def producer_handler(websocket):
while True:
message = inputqueue.get()
await websocket.send(message)
await asyncio.sleep(.1)
async def handler():
async with websockets.connect('ws://localhost:8765') as websocket:
consumer_task = asyncio.ensure_future(consumer_handler(websocket))
producer_task = asyncio.ensure_future(producer_handler(websocket))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,)
for task in pending:
task.cancel()
def start_websockets():
loop = asyncio.get_event_loop()
loop.run_until_complete(handler())
inputqueue = Queue()
outputqueue = Queue()
app = QtWidgets.QApplication(sys.argv)
comboxDialog = ComBox()
comboxDialog.inputqueue = inputqueue
comboxDialog.outputqueue = outputqueue
comboxDialog.show()
webster = Websocky()
webster.outputqueue = outputqueue
webster.updatemaingui[str].connect(comboxDialog.updategui)
webster.start()
p2 = Process(target=start_websockets)
p2.start()
sys.exit(app.exec_())
服务器:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import time
import websockets
# here we'll store all active connections to use for sending periodic messages
connections = []
#@asyncio.coroutine
async def connection_handler(connection, path):
connections.append(connection) # add connection to pool
while True:
msg = await connection.recv()
if msg is None: # connection lost
connections.remove(connection) # remove connection from pool, when client disconnects
break
else:
print('< {}'.format(msg))
#@asyncio.coroutine
async def send_periodically():
while True:
await asyncio.sleep(2) # switch to other code and continue execution in 5 seconds
for connection in connections:
message = str(round(time.time()))
print('> Periodic event happened.')
await connection.send(message) # send message to each connected client
start_server = websockets.serve(connection_handler, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.ensure_future(send_periodically()) # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()
答
不久发布了这个问题后,我意识到这个问题。该行
message = inputqueue.get()
在producer_handler函数中被阻塞。这会导致应该是异步函数来挂起该进程中的所有内容,直到它看到队列中的某些内容为止。我的解决方法是使用提供asyncio兼容队列的aioprocessing模块。因此,它看起来更像这样:
import aioprocessing
async def producer_handler(websocket):
while True:
message = await inputqueue.coro_get()
await websocket.send(message)
await asyncio.sleep(.1)
inputqueue = aioprocessing.AioQueue()
aioprocessing模块提供了一些不错的选项和文档。在这种情况下,这个问题是一个相当简单的解决方案。 https://github.com/dano/aioprocessing 所以,回答我的问题:不,你不必为这种事情使用quamash。