更换出现多处理pool.map与mpi4py
问题描述:
我在使用MPI初学者,我还在通过文件去。然而,当涉及到mpi4py时,几乎没有什么工作要做。我写了目前使用的多模块许多内核上运行代码,但我需要mpi4py这样我就可以使用一个以上的节点来运行我的代码替换此。我的代码如下,使用多处理模块时,也没有。更换出现多处理pool.map与mpi4py
随着多,
import numpy as np
import multiprocessing
start_time = time.time()
E = 0.1
M = 5
n = 1000
G = 1
c = 1
stretch = [10, 1]
#Point-Distribution Generator Function
def CDF_inv(x, e, m):
A = 1/(1 + np.log(m/e))
if x == 1:
return m
elif 0 <= x <= A:
return e * x/A
elif A < x < 1:
return e * np.exp((x/A) - 1)
#Elliptical point distribution Generator Function
def get_coor_ellip(dist=CDF_inv, params=[E, M], stretch=stretch):
R = dist(random.random(), *params)
theta = random.random() * 2 * np.pi
return (R * np.cos(theta) * stretch[0], R * np.sin(theta) * stretch[1])
def get_dist_sq(x_array, y_array):
return x_array**2 + y_array**2
#Function to obtain alpha
def get_alpha(args):
zeta_list_part, M_list_part, X, Y = args
alpha_x = 0
alpha_y = 0
for key in range(len(M_list_part)):
z_m_z_x = X - zeta_list_part[key][0]
z_m_z_y = Y - zeta_list_part[key][1]
dist_z_m_z = get_dist_sq(z_m_z_x, z_m_z_y)
alpha_x += M_list_part[key] * z_m_z_x/dist_z_m_z
alpha_y += M_list_part[key] * z_m_z_y/dist_z_m_z
return (alpha_x, alpha_y)
#The part of the process containing the loop that needs to be parallelised, where I use pool.map()
if __name__ == '__main__':
# n processes, scale accordingly
num_processes = 10
pool = multiprocessing.Pool(processes=num_processes)
random_sample = [CDF_inv(x, E, M)
for x in [random.random() for e in range(n)]]
zeta_list = [get_coor_ellip() for e in range(n)]
x1, y1 = zip(*zeta_list)
zeta_list = np.column_stack((np.array(x1), np.array(y1)))
x = np.linspace(-3, 3, 100)
y = np.linspace(-3, 3, 100)
X, Y = np.meshgrid(x, y)
print len(x)*len(y)*n,'calculations to be carried out.'
M_list = np.array([.001 for i in range(n)])
# split zeta_list, M_list, X, and Y
zeta_list_split = np.array_split(zeta_list, num_processes, axis=0)
M_list_split = np.array_split(M_list, num_processes)
X_list = [X for e in range(num_processes)]
Y_list = [Y for e in range(num_processes)]
alpha_list = pool.map(
get_alpha, zip(zeta_list_split, M_list_split, X_list, Y_list))
alpha_x = 0
alpha_y = 0
for e in alpha_list:
alpha_x += e[0] * 4 * G/(c**2)
alpha_y += e[1] * 4 * G/(c**2)
print("%f seconds" % (time.time() - start_time))
无多,
import numpy as np
E = 0.1
M = 5
G = 1
c = 1
M_list = [.1 for i in range(n)]
#Point-Distribution Generator Function
def CDF_inv(x, e, m):
A = 1/(1 + np.log(m/e))
if x == 1:
return m
elif 0 <= x <= A:
return e * x/A
elif A < x < 1:
return e * np.exp((x/A) - 1)
n = 1000
random_sample = [CDF_inv(x, E, M)
for x in [random.random() for e in range(n)]]
stretch = [5, 2]
#Elliptical point distribution Generator Function
def get_coor_ellip(dist=CDF_inv, params=[E, M], stretch=stretch):
R = dist(random.random(), *params)
theta = random.random() * 2 * np.pi
return (R * np.cos(theta) * stretch[0], R * np.sin(theta) * stretch[1])
#zeta_list is the list of coordinates of a distribution of points
zeta_list = [get_coor_ellip() for e in range(n)]
x1, y1 = zip(*zeta_list)
zeta_list = np.column_stack((np.array(x1), np.array(y1)))
#Creation of a X-Y Grid
x = np.linspace(-3, 3, 100)
y = np.linspace(-3, 3, 100)
X, Y = np.meshgrid(x, y)
def get_dist_sq(x_array, y_array):
return x_array**2 + y_array**2
#Calculation of alpha, containing the loop that needs to be parallelised.
alpha_x = 0
alpha_y = 0
for key in range(len(M_list)):
z_m_z_x = X - zeta_list[key][0]
z_m_z_y = Y - zeta_list[key][1]
dist_z_m_z = get_dist_sq(z_m_z_x, z_m_z_y)
alpha_x += M_list[key] * z_m_z_x/dist_z_m_z
alpha_y += M_list[key] * z_m_z_y/dist_z_m_z
alpha_x *= 4 * G/(c**2)
alpha_y *= 4 * G/(c**2)
基本上我的代码所做的是,它首先生成按照一定的分发点的列表。然后我应用一个方程来获得数量'阿尔法'使用点之间的距离之间的不同关系。需要并行化的部分是计算alpha所涉及的单个for循环。我想要做的是使用mpi4py而不是多处理来做到这一点,我不知道如何去做这件事。
答
改造multiprocessing.map版本MPI可以使用scatter
/gather
来完成。在你的情况下,它是有用的,你已经准备好输入列表为每个等级的一个块。主要的区别是,所有的代码被用在首位各级执行,所以你必须让那些只有大师等级0 conidtional来所做的一切。
if __name__ == '__main__':
comm = MPI.COMM_WORLD
if comm.rank == 0:
random_sample = [CDF_inv(x, E, M)
for x in [random.random() for e in range(n)]]
zeta_list = [get_coor_ellip() for e in range(n)]
x1, y1 = zip(*zeta_list)
zeta_list = np.column_stack((np.array(x1), np.array(y1)))
x = np.linspace(-3, 3, 100)
y = np.linspace(-3, 3, 100)
X, Y = np.meshgrid(x, y)
print len(x)*len(y)*n,'calculations to be carried out.'
M_list = np.array([.001 for i in range(n)])
# split zeta_list, M_list, X, and Y
zeta_list_split = np.array_split(zeta_list, comm.size, axis=0)
M_list_split = np.array_split(M_list, comm.size)
X_list = [X for e in range(comm.size)]
Y_list = [Y for e in range(comm.size)]
work_list = list(zip(zeta_list_split, M_list_split, X_list, Y_list))
else:
work_list = None
my_work = comm.scatter(work_list)
my_alpha = get_alpha(my_work)
alpha_list = comm.gather(my_alpha)
if comm.rank == 0:
alpha_x = 0
alpha_y = 0
for e in alpha_list:
alpha_x += e[0] * 4 * G/(c**2)
alpha_y += e[1] * 4 * G/(c**2)
只要每个处理器获得类似数量的工作,这工作正常。如果通信成为一个问题,你可能要处理器,而不是做这一切的主级别0
注中拆分数据生成:关于代码的有些东西是假的,例如alpha_[xy]
结束为np.ndarray
。串行版本发生错误。
哇!这工作完美,谢谢!就目前而言,数据生成似乎并不是太耗时,但我会将它分发给其他处理器。而且,alpha_ [xy]不是一个有效的变量,我实际上使用alpha_x和alpha_y来派生另一个称为渐变的数量。当我运行它时,它似乎正常工作... – ThunderFlash