如果在超时时间内完成执行或者进行回调,则返回函数

问题描述:

我在Python 3.5中有一个项目,没有使用异步功能。我要实现如下因素的逻辑:如果在超时时间内完成执行或者进行回调,则返回函数

def should_return_in_3_sec(some_serious_job, arguments, finished_callback): 
    # Start some_serious_job(*arguments) in a task 
    # if it finishes within 3 sec: 
    # return result immediately 
    # otherwise return None, but do not terminate task. 
    # If the task finishes in 1 minute: 
    # call finished_callback(result) 
    # else: 
    # call finished_callback(None) 
    pass 

功能should_return_in_3_sec()应保持同步,但它是由我来写任何新的异步代码(包括some_serious_job())。

什么是最优雅和pythonic的方式来做到这一点?

从一个执行严肃工作的线程开始,让它将其结果写入一个队列,然后终止。从该队列读取主线程,超时三秒。如果发生超时,请启动另一个线程并返回无。让第二个线程以超过一分钟的时间从队列中读取;如果超时,也可以调用finished_callback(None);否则调用finished_callback(result)。

我勾画这样的:

import threading, queue 

def should_return_in_3_sec(some_serious_job, arguments, finished_callback): 
    result_queue = queue.Queue(1) 

    def do_serious_job_and_deliver_result(): 
    result = some_serious_job(arguments) 
    result_queue.put(result) 

    threading.Thread(target=do_serious_job_and_deliver_result).start() 

    try: 
    result = result_queue.get(timeout=3) 
    except queue.Empty: # timeout? 

    def expect_and_handle_late_result(): 
     try: 
     result = result_queue.get(timeout=60) 
     except queue.Empty: 
     finished_callback(None) 
     else: 
     finished_callback(result) 

    threading.Thread(target=expect_and_handle_late_result).start() 
    return None 
    else: 
    return result 
+0

不错!虽然我没有发现在稍后的超时时间终止serious_job ...我也想知道,在这个多线程示例中使用标准'队列'是否可以? – greatvovan

+0

“队列”模块用于线程上下文,是的。它明确同步它的使用。当然,如果终止拆分线程,你会遇到一个小问题(取决于你的上下文,如果它实际上会成为问题)。但是,你给他们的规格不允许再次正确加入他们。 – Alfe

threading模块有一些简单的超时选项,请参见Thread.join(timeout)例如。

如果你选择使用ASYNCIO,下面是解决一些您的需求AA部分解决:

import asyncio 

import time 


async def late_response(task, flag, timeout, callback): 
    done, pending = await asyncio.wait([task], timeout=timeout) 
    callback(done.pop().result() if done else None) # will raise an exception if some_serious_job failed 
    flag[0] = True # signal some_serious_job to stop 
    return await task 


async def launch_job(loop, some_serious_job, arguments, finished_callback, 
        timeout_1=3, timeout_2=5): 
    flag = [False] 
    task = loop.run_in_executor(None, some_serious_job, flag, *arguments) 
    done, pending = await asyncio.wait([task], timeout=timeout_1) 
    if done: 
     return done.pop().result() # will raise an exception if some_serious_job failed 
    asyncio.ensure_future(
     late_response(task, flag, timeout_2, finished_callback)) 
    return None 


def f(flag, n): 
    for i in range(n): 
     print("serious", i, flag) 
     if flag[0]: 
      return "CANCELLED" 
     time.sleep(1) 
    return "OK" 


def finished(result): 
    print("FINISHED", result) 


loop = asyncio.get_event_loop() 
result = loop.run_until_complete(launch_job(loop, f, [1], finished)) 
print("result:", result) 
loop.run_forever() 

这将在一个单独的线程(使用loop.set_executor(ProcessPoolExecutor())运行作业在运行CPU密集型任务一个过程)。请记住,终止进程/线程是一种不好的做法 - 上面的代码使用非常简单的列表来指示线程停止(另请参阅threading.Event/multiprocessing.Event)。

在实施您的解决方案时,您可能会发现您希望修改现有代码以使用couroutine而不是使用线程。