运行多个异步函数并获取每个函数的返回值

问题描述:

我试图创建一个可以运行多个进程的异步函数,并将发送响应。由于multiprocessing.Process()不返回响应,我想创建一个函数为:运行多个异步函数并获取每个函数的返回值

from multiprocessing import Process 

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    response_list = [] 
    def worker(function, f_args, f_kwargs, response_list): 
     """ 
     Runs the function and appends the output to list 
     """ 
     response = function(*f_args, **f_kwargs) 
     response_list.append(response) 

    processes = [Process(target=worker, args=(func, args, kwargs, response_list)) \ 
        for func, args, kwargs in func_list] 

    for process in processes: 
     process.start() 
    for process in processes: 
     process.join() 
    return response_list 

在这个函数中,我称之为worker异步它接受额外的参数作为list。因为列表是作为参考传递的,所以我想我可以在列表中附加实际函数的响应。并且async_call将返回所有函数的响应。

但这不是我期望的行为方式。将值附加到的worker()内,但工作人员response_list列表保持空白。

任何想法我做错了什么?而且,还有什么替代方案可以实现我的目标?

您不能直接跨进程共享对象。您需要使用专门用于传递值的类Queue and Pipe;见the documentation

+0

明白了。我只能对'queue.put()'执行的''queue.get()'做'queue.put()'。对于更多的访问项目,它会冻结。所以我正在迭代'func_list'的'len'。有没有更好的方法来做到这一点? –

+0

有没有办法将该值与函数映射?作为一个黑客,我可以想到用一个键作为函数的索引返回字典对象,并返回键排序。有没有其他更pythonic的方式来实现这一目标? –

Daniel's Answer所述,我们不能直接在进程之间共享对象。在这里我使用multiprocessing.Queue()和更新功能:

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    def worker(function, f_args, f_kwargs, queue, index): 
     """ 
     Runs the function and appends the output to list, and the Exception in the case of error 
     """ 
     response = { 
      'index': index, # For tracking the index of each function in actual list. 
          # Since, this function is called asynchronously, order in 
          # queue may differ 
      'data': None, 
      'error': None 
     } 

     # Handle error in the function call 
     try: 
      response['data'] = function(*f_args, **f_kwargs) 
     except Exception as e: 
      response['error'] = e # send back the exception along with the queue 

     queue.put(response) 
    queue = Queue() 
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \ 
        for i, (func, args, kwargs) in enumerate(func_list)] 

    for process in processes: 
     process.start() 

    response_list = [] 
    for process in processes: 
     # Wait for process to finish 
     process.join() 

     # Get back the response from the queue 
     response = queue.get() 
     if response['error']: 
      raise response['error'] # Raise exception if the function call failed 
     response_list.append(response) 

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])] 

采样运行:

def my_sum(x, y): 
    return x + y 

def your_mul(x, y): 
    return x*y 

my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]] 

async_call(my_func_list) 
# Value returned: [3, 2]