从asyncio StreamReader将字节泵入文件描述符

问题描述:

我有一个从文件描述符(包装在C++端FILE*中)读取的Python函数(用C++实现),我需要从asyncio.StreamReader提供函数。具体而言,读者是HTTP响应的内容:aiohttp.ClientResponse.content从asyncio StreamReader将字节泵入文件描述符

我想我可能open a pipe,将读取端传递给C++函数,并将connect the write-end传递给asyncio的事件循环。但是,如何通过适当的流量控制和尽可能少的复制将数据从流读取器移动到管道?

与缺少的部分代码的骨架如下:

# obtain the StreamReader from aiohttp 
content = aiohttp_client_response.content 
# create a pipe 
(pipe_read_fd, pipe_write_fd) = os.pipe() 

# now I need a suitable protocol to manage the pipe transport 
protocol = ? 
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd) 

# the protocol should start reading from `content` and writing into the pipe 
return pipe_read_fd 

subprocess_attach_write_pipe ASYNCIO例如:

rfd, wfd = os.pipe() 
pipe = open(wfd, 'wb', 0) 
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe) 
transport.write(b'data') 

编辑 - 对于写入流控制,请参阅下面的方法:

这里是一个可能的FlowControl实施后,StreamWriter.drain启发:

class FlowControl(asyncio.streams.FlowControlMixin): 
    async def drain(self): 
     await self._drain_helper() 

用法:

transport, protocol = await loop.connect_write_pipe(FlowControl, pipe) 
transport.write(b'data') 
await protocol.drain() 
+0

这显示了如何打开管道与ASYNCIO写作,但不显示如何正确地复制从'asyncio.StreamReader'到管道。特别是,如果从管道读取的队伍速度太慢以至于跟不上StreamReader,简单地从阅读器读取字节块并将它们提供给'transport.write'可能会溢出缓冲区。 –

+0

@JanŠpaček看到我关于写流控制的编辑,希望有所帮助。 – Vincent

我解决此问题得到了通过使用ThreadPoolExecutor和阻塞调用os.write

(read_fd, write_fd) = os.pipe() 
task_1 = loop.create_task(pump_bytes_into_fd(write_fd)) 
task_2 = loop.run_in_executor(executor_1, parse_bytes_from_fd(read_fd)) 

async def pump_bytes_into_fd(write_fd): 
    while True: 
     chunk = await stream.read(CHUNK_SIZE) 
     if chunk is None: break 
     # process the chunk 
     await loop.run_in_executor(executor_2, os.write, write_fd, chunk) 

这是至关重要的两个不同的执行程序用于阻止读取和写入以避免死锁。