逐步创造异步任务,并等待他们全部完成
我试图做一个程序,使很多网络套接字连接到我创建了服务器:逐步创造异步任务,并等待他们全部完成
class WebSocketClient():
@asyncio.coroutine
def run(self):
print(self.client_id, 'Connecting')
ws = yield from aiohttp.ws_connect(self.url)
print(self.client_id, 'Connected')
print(self.client_id, 'Sending the message')
ws.send_str(self.make_new_message())
while not ws.closed:
msg = yield from ws.receive()
if msg.tp == aiohttp.MsgType.text:
print(self.client_id, 'Received the echo')
yield from ws.close()
break
print(self.client_id, 'Closed')
@asyncio.coroutine
def make_clients():
for client_id in range(args.clients):
yield from WebSocketClient(client_id, WS_CHANNEL_URL.format(client_id=client_id)).run()
event_loop.run_until_complete(make_clients())
的问题是,所有客户端完成他们的工作一个接一个:
0 Connecting
0 Connected
0 Sending the message
0 Received the echo
0 Closed
1 Connecting
1 Connected
1 Sending the message
1 Received the echo
1 Closed
...
我试着使用asyncio.wait
,但是所有客户一起开始。我希望它们逐渐创建并在创建它们时立即连接到服务器。同时继续创造新客户。
我应该用什么方法来实现这个目标?
使用asyncio.wait是一种很好的方法。你可以用asyncio.ensure_future和asyncio.sleep结合起来,逐步创建任务:
@asyncio.coroutine
def make_clients(nb_clients, delay):
futures = []
for client_id in range(nb_clients):
url = WS_CHANNEL_URL.format(client_id=client_id)
coro = WebSocketClient(client_id, url).run()
futures.append(asyncio.ensure_future(coro))
yield from asyncio.sleep(delay)
yield from asyncio.wait(futures)
编辑:我实现了一个FutureSet
类,应该做你想要什么。这个集合可以填充期货,并在完成后自动删除它们。也可以等待所有的未来完成。
class FutureSet:
def __init__(self, maxsize, *, loop=None):
self._set = set()
self._loop = loop
self._maxsize = maxsize
self._waiters = []
@asyncio.coroutine
def add(self, item):
if not asyncio.iscoroutine(item) and \
not isinstance(item, asyncio.Future):
raise ValueError('Expecting a coroutine or a Future')
if item in self._set:
return
while len(self._set) >= self._maxsize:
waiter = asyncio.Future(loop=self._loop)
self._waiters.append(waiter)
yield from waiter
item = asyncio.async(item, loop=self._loop)
self._set.add(item)
item.add_done_callback(self._remove)
def _remove(self, item):
if not item.done():
raise ValueError('Cannot remove a pending Future')
self._set.remove(item)
if self._waiters:
waiter = self._waiters.pop(0)
waiter.set_result(None)
@asyncio.coroutine
def wait(self):
return asyncio.wait(self._set)
实施例:
@asyncio.coroutine
def make_clients(nb_clients, limit=0):
futures = FutureSet(maxsize=limit)
for client_id in range(nb_clients):
url = WS_CHANNEL_URL.format(client_id=client_id)
client = WebSocketClient(client_id, url)
yield from futures.add(client.run())
yield from futures.wait()
'asyncio.Queue'是一个* final *类不用于继承。 因此,即使技术上可行,用户也不应该从'asyncio.Queue'派生自己的类。 –
@AndrewSvetlov我想用户可能想要从'asyncio.Queue'继承来创建不同类型的队列(比如'asyncio.PriorityQueue'或'asyncio.LifoQueue'),但在这种情况下,我只是懒惰:p I无论如何摆脱它。 – Vincent
不,用户不能(至少不应该)。 'LifoQueue'和'PriorityQueue'是'asyncio'类,不适用于继承。 用于继承的唯一'asyncio'类是'Protocol'和family。 当我们设计图书馆时,国家几次被Guido van Rossum发音。 –
你可以[使用信号量来限制并发连接的数量(http://*.com/a/20722204/4279) – jfs
请装饰WebSocketClient如' @ coroutine' –
@AndrewSvetlov是的,它是装饰 - 复制/粘贴错误 – warvariuc