逐步创造异步任务,并等待他们全部完成

问题描述:

我试图做一个程序,使很多网络套接字连接到我创建了服务器:逐步创造异步任务,并等待他们全部完成

class WebSocketClient(): 

    @asyncio.coroutine 
    def run(self): 
     print(self.client_id, 'Connecting') 
     ws = yield from aiohttp.ws_connect(self.url) 
     print(self.client_id, 'Connected') 
     print(self.client_id, 'Sending the message') 
     ws.send_str(self.make_new_message()) 

     while not ws.closed: 
      msg = yield from ws.receive() 

      if msg.tp == aiohttp.MsgType.text: 
       print(self.client_id, 'Received the echo') 
       yield from ws.close() 
       break 

     print(self.client_id, 'Closed') 


@asyncio.coroutine 
def make_clients(): 

    for client_id in range(args.clients): 
     yield from WebSocketClient(client_id, WS_CHANNEL_URL.format(client_id=client_id)).run() 


event_loop.run_until_complete(make_clients()) 

的问题是,所有客户端完成他们的工作一个接一个:

0 Connecting 
0 Connected 
0 Sending the message 
0 Received the echo 
0 Closed 
1 Connecting 
1 Connected 
1 Sending the message 
1 Received the echo 
1 Closed 
... 

我试着使用asyncio.wait,但是所有客户一起开始。我希望它们逐渐创建并在创建它们时立即连接到服务器。同时继续创造新客户。

我应该用什么方法来实现这个目标?

+0

你可以[使用信号量来限制并发连接的数量(http://*.com/a/20722204/4279) – jfs

+0

请装饰WebSocketClient如' @ coroutine' –

+0

@AndrewSvetlov是的,它是装饰 - 复制/粘贴错误 – warvariuc

使用asyncio.wait是一种很好的方法。你可以用asyncio.ensure_futureasyncio.sleep结合起来,逐步创建任务:

@asyncio.coroutine 
def make_clients(nb_clients, delay): 
    futures = [] 
    for client_id in range(nb_clients): 
     url = WS_CHANNEL_URL.format(client_id=client_id) 
     coro = WebSocketClient(client_id, url).run() 
     futures.append(asyncio.ensure_future(coro)) 
     yield from asyncio.sleep(delay) 
    yield from asyncio.wait(futures) 

编辑:我实现了一个FutureSet类,应该做你想要什么。这个集合可以填充期货,并在完成后自动删除它们。也可以等待所有的未来完成。

class FutureSet: 

    def __init__(self, maxsize, *, loop=None): 
     self._set = set() 
     self._loop = loop 
     self._maxsize = maxsize 
     self._waiters = [] 

    @asyncio.coroutine 
    def add(self, item): 
     if not asyncio.iscoroutine(item) and \ 
      not isinstance(item, asyncio.Future): 
      raise ValueError('Expecting a coroutine or a Future') 
     if item in self._set: 
      return 
     while len(self._set) >= self._maxsize: 
      waiter = asyncio.Future(loop=self._loop) 
      self._waiters.append(waiter) 
      yield from waiter 
     item = asyncio.async(item, loop=self._loop)  
     self._set.add(item) 
     item.add_done_callback(self._remove) 

    def _remove(self, item): 
     if not item.done(): 
      raise ValueError('Cannot remove a pending Future') 
     self._set.remove(item) 
     if self._waiters: 
      waiter = self._waiters.pop(0) 
      waiter.set_result(None) 

    @asyncio.coroutine 
    def wait(self): 
     return asyncio.wait(self._set) 

实施例:

@asyncio.coroutine 
def make_clients(nb_clients, limit=0): 
    futures = FutureSet(maxsize=limit) 
    for client_id in range(nb_clients): 
     url = WS_CHANNEL_URL.format(client_id=client_id) 
     client = WebSocketClient(client_id, url) 
     yield from futures.add(client.run()) 
    yield from futures.wait() 
+0

'asyncio.Queue'是一个* final *类不用于继承。 因此,即使技术上可行,用户也不应该从'asyncio.Queue'派生自己的类。 –

+0

@AndrewSvetlov我想用户可能想要从'asyncio.Queue'继承来创建不同类型的队列(比如'asyncio.PriorityQueue'或'asyncio.LifoQueue'),但在这种情况下,我只是懒惰:p I无论如何摆脱它。 – Vincent

+0

不,用户不能(至少不应该)。 'LifoQueue'和'PriorityQueue'是'asyncio'类,不适用于继承。 用于继承的唯一'asyncio'类是'Protocol'和family。 当我们设计图书馆时,国家几次被Guido van Rossum发音。 –