Python asyncio:在工作线程上运行subprocess_exec
因此,我使用Python asyncio
模块(在Linux上)启动子进程,然后对其进行异步监视。我的代码工作正常......当在主线程上运行。但是,当我在工作线程上运行它时,它会挂起,并且从不调用回调函数。Python asyncio:在工作线程上运行subprocess_exec
我怀疑这可能实际上是某种未公开的缺陷或在工作线程上运行subprocess_exec
问题,可能与实现在后台线程中处理信号的方式有关。但它也可能只是我搞砸了。
一个简单的,可再现的例子是如下:因此,这里
class MyProtocol(asyncio.SubprocessProtocol):
def __init__(self, done_future):
super().__init__()
self._done_future = done_future
def pipe_data_received(self, fd, data):
print("Received:", len(data))
def process_exited(self):
print("PROCESS EXITED!")
self._done_future.set_result(None)
def run(loop):
done_future = asyncio.Future(loop = loop)
transport = None
try:
transport, protocol = yield from loop.subprocess_exec(
lambda : MyProtocol(done_future),
"ls",
"-lh",
stdin = None
)
yield from done_future
finally:
if transport: transport.close()
return done_future.result()
def run_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
try:
return loop.run_until_complete(run(loop))
finally:
loop.close()
,我设置一个asyncio
事件循环执行该外壳命令ls -lh
,然后触发当从子接收到的数据为回调,以及子进程退出时的另一个回调。
如果我直接在Python程序的主线程中调用run_loop()
,一切都会正常。但是,如果我说:
t = threading.Thread(target = run_loop)
t.start()
t.join()
那么什么情况是,pipe_data_received()
回调调用成功,但process_exited()
则永远不会调用,程序只是挂起。
周围的Googling和寻找为unix_events.py
实施asyncio
源代码后,我发现它可能需要我的事件循环手动连接到全球“孩子守望者”对象,如下所示:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
asyncio.get_child_watcher().attach_loop(loop)
显然,子监视器是一个(无证)对象,负责在引擎盖下(或类似的地方)调用waitpid
。但是,当我想这一点,并在后台线程跑run_event_loop()
,我得到了错误:
File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler
raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread
所以在这里,它看起来像的实施实际上做了检查,以确保信号处理程序只能在主用线程,导致我相信,在当前的实现中,在后台线程上使用subprocess_exec
实际上,根本不可能不改变Python源代码本身。
我正确吗?不幸的是,asyncio
模块的记录很少,所以我很难对这里的结论充满信心。我可能只是在做错事。
只要一ASYNCIO循环和其子观察家主线程运行在辅助线程处理子过程是好的实例化:
asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, run_loop)
loop.run_until_complete(coro)