并行化嵌套Python for循环

问题描述:

什么类型的并行Python方法适用于有效传播CPU绑定工作负载,如下所示。并行化该部分是否可行?看起来循环迭代之间没有太多紧密耦合,即只要在最后完成适当的通信以重建变量,就可以并行处理循环的部分。我目前使用的是Python2.7,但如果可以在较新的版本中轻松处理这个问题,那么我会考虑迁移代码库。并行化嵌套Python for循环

我试图用下面的例子来捕捉计算的精神。我相信它与我的实际代码具有相同的循环/变量之间的连通性。

nx = 20 
ny = 30 
myList1 = [0]*100 
myList2 = [1]*25 
value1 = np.zeros(nx) 
value2 = np.zeros(ny) 
store = np.zeros(nx,ny,len(myList1),len(myList2)) 
for i in range(nx): 
    for j in range(ny): 
    f = calc(value1[i],value2[j]) #returns a list 
    for k,data1 in enumerate(myList1): 
     for p,data2 in enumerate(myList2): 
     meanval = np.sum(f[:]/data1)*data2 
     store[i,j,k,p] = meanval 
+1

如果你有Python或numpy的代码,而无需或仅与其他模块几乎没有互动,我会建议numba。这里是一个使用多处理的标准并行化的例子和比较。 https://*.com/a/45359693/4045774 – max9111

以下是您可以采取的两种方法。什么是智慧也取决于瓶颈在哪里,哪些东西最能衡量而不是猜测。

理想的选择是将所有低级优化都留给Numpy。现在,您可以混合使用本机Python代码和Numpy代码。后者在循环中播放不好。当然,它们是工作的,但通过在Python中进行循环,您可以按照指定的顺序强制执行操作。给予Numpy操作更好,它可以在尽可能多的元素上执行,即矩阵变换。这有利于性能,不仅仅是因为自动(部分)并行化;即使是单线程也能够从CPU中获得更多。强烈推荐阅读以了解更多关于此的是From Python to Numpy

如果您确实需要并行化纯Python代码,那么除了多进程之外,您没有多少选择。为此,请参阅multiprocessing模块。重新排列代码分为三个步骤:

  • 准备投入每一项工作
  • 分割工人的池之间的作业并行运行(叉/图)
  • 收集的结果(加入/减少)

您需要在足够的过程之间取得平衡,以使并行有价值,而不是太多,以至于它们会过于短暂。转变流程并与他们沟通的成本将会变得非常重要。

一个简单的解决方案是生成一个(i,j)对的列表,以便有nx*ny作业。然后创建一个函数,将这个对作为输入并返回一个列表(i,j,k,p,meanval)。尽量只使用函数的输入并返回结果。本地一切;无副作用等等。对全局变量(如myList1)的只读访问是可以的,但修改需要文档中描述的特殊措施。将函数和输入列表传递给工作池。一旦完成了部分结果的生成,将所有这些结果合并到store

下面是一个示例脚本:

from multiprocessing import Pool 
import numpy as np 

# Global variables are OK, as long as their contents are not modified, although 
# these might just as well be moved into the worker function or an initializer 
nx = 20 
ny = 30 
myList1 = [0]*100 
myList2 = [1]*25 
value1 = np.zeros(nx) 
value2 = np.zeros(ny) 

def calc_meanvals_for(pair): 
    """Process a reasonably sized chunk of the problem""" 
    i, j = pair 
    f = calc(value1[i], value2[j]) 
    results = [] 
    for k, data1 in enumerate(myList1): 
     for p, data2 in enumerate(myList2): 
      meanval = np.sum(f[:]/data1)*data2 
      results.append((i,j,k,p,meanval)) 
    return results 

# This module will be imported by every worker - that's how they will be able 
# to find the global variables and the calc function - so make sure to check 
# if this the main program, because without that, every worker will start more 
# workers, each of which will start even more, and so on, in an endless loop 
if __name__ == '__main__': 
    # Create a pool of worker processes, each able to use a CPU core 
    pool = Pool() 
    # Prepare the arguments, one per function invocation (tuples to fake multiple) 
    arg_pairs = [(i,j) for i in range(nx) for j in range(ny)] 
    # Now comes the parallel step: given a function and a list of arguments, 
    # have a worker invoke that function with one argument until all arguments 
    # have been used, collecting the return values in a list 
    return_values = pool.map(calc_meanvals_for, arg_pairs) 
    # Since the function also returns a list, there's now a list of lists - consider 
    # itertools.chain.from_iterable to flatten them - to be processed further 
    store = np.zeros(nx, ny, len(myList1), len(myList2)) 
    for results in return_values: 
     for i, j, k, p, meanval in results: 
      store[i,j,k,p] = meanval 
+0

Numpy不能很好的循环播放? – wandadars

+1

@wandadars我在回答中澄清了这个陈述。 –

+0

就多处理而言,多处理可以从一个连续执行的脚本开始,即代码串行到我问题中的部分,然后生成进程以运行这些循环,然后在完成后我收集它们的数据,继续使用串行脚本? – wandadars