运行多个异步函数并获取每个函数的返回值
问题描述:
我试图创建一个可以运行多个进程的异步函数,并将发送响应。由于multiprocessing.Process()
不返回响应,我想创建一个函数为:运行多个异步函数并获取每个函数的返回值
from multiprocessing import Process
def async_call(func_list):
"""
Runs the list of function asynchronously.
:param func_list: Expects list of lists to be of format
[[func1, args1, kwargs1], [func2, args2, kwargs2], ...]
:return: List of output of the functions
[output1, output2, ...]
"""
response_list = []
def worker(function, f_args, f_kwargs, response_list):
"""
Runs the function and appends the output to list
"""
response = function(*f_args, **f_kwargs)
response_list.append(response)
processes = [Process(target=worker, args=(func, args, kwargs, response_list)) \
for func, args, kwargs in func_list]
for process in processes:
process.start()
for process in processes:
process.join()
return response_list
在这个函数中,我称之为worker
异步它接受额外的参数作为list
。因为列表是作为参考传递的,所以我想我可以在列表中附加实际函数的响应。并且async_call
将返回所有函数的响应。
但这不是我期望的行为方式。将值附加到的worker()
内,但工作人员response_list
列表保持空白。
任何想法我做错了什么?而且,还有什么替代方案可以实现我的目标?
答
如Daniel's Answer所述,我们不能直接在进程之间共享对象。在这里我使用multiprocessing.Queue()
和更新功能:
def async_call(func_list):
"""
Runs the list of function asynchronously.
:param func_list: Expects list of lists to be of format
[[func1, args1, kwargs1], [func2, args2, kwargs2], ...]
:return: List of output of the functions
[output1, output2, ...]
"""
def worker(function, f_args, f_kwargs, queue, index):
"""
Runs the function and appends the output to list, and the Exception in the case of error
"""
response = {
'index': index, # For tracking the index of each function in actual list.
# Since, this function is called asynchronously, order in
# queue may differ
'data': None,
'error': None
}
# Handle error in the function call
try:
response['data'] = function(*f_args, **f_kwargs)
except Exception as e:
response['error'] = e # send back the exception along with the queue
queue.put(response)
queue = Queue()
processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \
for i, (func, args, kwargs) in enumerate(func_list)]
for process in processes:
process.start()
response_list = []
for process in processes:
# Wait for process to finish
process.join()
# Get back the response from the queue
response = queue.get()
if response['error']:
raise response['error'] # Raise exception if the function call failed
response_list.append(response)
return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])]
采样运行:
def my_sum(x, y):
return x + y
def your_mul(x, y):
return x*y
my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]]
async_call(my_func_list)
# Value returned: [3, 2]
明白了。我只能对'queue.put()'执行的''queue.get()'做'queue.put()'。对于更多的访问项目,它会冻结。所以我正在迭代'func_list'的'len'。有没有更好的方法来做到这一点? –
有没有办法将该值与函数映射?作为一个黑客,我可以想到用一个键作为函数的索引返回字典对象,并返回键排序。有没有其他更pythonic的方式来实现这一目标? –