如何使用asyncio实现recvmsg()?

问题描述:

由于recvmsg()asyncio模块丢失,我试图重新实现它以完全相同的方式BaseEventLoop.sock_recv()实现:如何使用asyncio实现recvmsg()?

import asyncio, socket 


def _sock_recvmsg(loop, fut, registered, sock, bufsize, ancbufsize): 
    self = loop 
    fd = sock.fileno() 
    if registered: self.remove_reader(fd) 
    if fut.cancelled(): return 
    try: data = sock.recvmsg(bufsize, ancbufsize) 
    except (BlockingIOError, InterruptedError): self.add_reader(fd, self._sock_recvmsg, fut, True, sock, bufsize, ancbufsize) 
    except Exception as exc: fut.set_exception(exc) 
    else: fut.set_result(data) 


def sock_recvmsg(loop, sock, bufsize, ancbufsize=0): 
    self = loop 
    if self._debug and sock.gettimeout() != 0: raise ValueError('the socket must be non-blocking') 
    fut = asyncio.futures.Future(loop=self) 
    self._sock_recvmsg(fut, False, sock, bufsize, ancbufsize) 
    return fut 


asyncio.unix_events._UnixSelectorEventLoop._sock_recvmsg = _sock_recvmsg 
asyncio.unix_events._UnixSelectorEventLoop.sock_recvmsg = sock_recvmsg 

但这种平凡的测试失败,只接收第一值和试验后挂起即:

async def produce(): 
    for i in range(5): 
     end_out.sendmsg([bytes([i])]) 
     await asyncio.sleep(1) 

    end_out.close() 

async def consume(): 
    while True: 
     v, a, b, c = await asyncio.get_event_loop().sock_recvmsg(end_in, 1) 
     if v == b'': return 
     print(v) 

if __name__ == '__main__': 
    end_in, end_out = socket.socketpair() 
    asyncio.ensure_future(produce()) 
    asyncio.get_event_loop().run_until_complete(consume()) 

我错过了什么?

recvmsg() - 相关代码是正确的,但我已经错过了

end_in.setblocking(0) 
end_out.setblocking(0) 

之所以如此,是愚蠢的,所以我在考虑这个问题的缺失。