Python并发之分布式进程(3)
如果任务数量实在庞大,在一台机器上进行有限的多进程工作模式也往往很难吃的消。这个时候,我们就需要把多进程分配到多台机器上通过网络互相通信进行协同工作,相比最多只能分布到多个CPU的线程,这种工作模式的效率会有非常大的提升。
这种建立在网络之上的软件系统,就称为分布式系统(distributed system)
分布式系统有两大特点:内聚性和透明性
- 内聚性:每一个数据库节点高度自治,有本地的数据库管理系统。
- 透明性:每一个数据库分布节点对于用户的应用来说都是透明的,是无法区分本地还是远程的。
现在我们来了解一下,如何使用Python搭建分布式进程工作模式。
Python的multiprocessing
模块不但支持多进程,其中managers
子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers
模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
总的分为两个部分:master端和worker端。master端,用于任务的发布、通信队列的创建等;worker端用于完成服务端分配的任务,并将结果放入通信队列。
BaseManager初始化函数:
BaseManager([address[, authkey]])
- address:是管理器进程侦听新连接的地址。 如果地址是无,则选择任意一个。
- authkey:是将用于检查到服务器进程的传入连接的有效性的认证**。 如果authkey是None,那么使用当前进程current_process()的authkey; 否则使用的authkey,它必须是字符串。
一旦创建BaseManager对象,应调用start()或get_server()。serve_forever()以确保管理器对象引用已启动的管理器进程。
BaseManager对象方法和属性如下:
start([initializer [,initargs]]) | 启动子过程以启动管理器。 如果初始化程序不是None,那么子程序在启动时会调用initializer(*initargs) |
get_server() | 返回一个Server对象,它表示在Manager控制下的实际服务器。 Server对象支持serve_forever()方法。 |
connect() | 将本地管理器对象连接到远程管理器进程 |
shutdown() | 停止管理器在使用的进程。这仅在用start()已启动服务器进程时可用,可以被多次调用。 |
register(typeid [,callable [,proxytype [,exposed [,method_to_typeid [,create_method]]]]]) |
可以用于向管理器类注册类型或可调用的类方法。
|
address | 管理器使用的地址 |
join(timeout=None) | 阻塞,直到所有子进程任务结束 |
1,分布式进程间的数据共享
server主进程注册全局变量,client子进程可以更新变量。
server.py代码:
from multiprocessing.managers import BaseManager
if __name__ == "__main__":
list_data = ["Leo", "Frank"]
mgr = BaseManager(address=('127.0.0.1', 4444), authkey=b'hello')
mgr.register('get_account', callable=lambda : list_data) ##注册全局变量
server = mgr.get_server()
server.serve_forever() ##server用不关闭
client.py代码:
from multiprocessing.managers import BaseManager
from multiprocessing import Process, Manager
def add_user1(list):
list.append("Bruce")
def add_user2(list):
list.append("John")
if __name__ == "__main__":
mgr = BaseManager(address=('127.0.0.1', 4444), authkey=b'hello')
mgr.register('get_account')
mgr.connect()
data = mgr.get_account()
p1 = Process(target=add_user1, args=(data, ))
p1.start()
p1.join()
p2 = Process(target=add_user2, args=(data, ))
p2.start()
p2.join()
print(data)
client输出如下:
['Leo', 'Frank', 'Bruce', 'John', 'Bruce', 'John']
2,使用场景示例
场景一:简单的master/worker模型
master向task_queue发布put任务,worker从task_queue获取get到任务并处理完毕后将结果发布put到result_queue中,master再从result_queue中get到执行结果并汇总。
master端代码:
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
task_queue = Queue() ##任务队列
result_queue = Queue() ##执行结果队列
def get_task():
return task_queue
def get_result():
return result_queue
class TaskManager(BaseManager):
pass
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
TaskManager.register('get_task_queue', callable=get_task)
TaskManager.register('get_result_queue', callable=get_result)
## 绑定端口,设定访问密码
mgr = TaskManager(address=("127.0.0.1", 4444), authkey=b"python") ##注意authkey不能为明文,否则报错TypeError: string argument without an encoding
## 启动
mgr.start()
# 获得通过网络访问的Queue对象:
task = mgr.get_task_queue()
result = mgr.get_result_queue()
data_list = [1, 2, 3, 4, 5, 6]
##发布任务
for data in data_list:
task.put(data)
print("put data in task: {}".format(data))
##获取结果
for i in range(len(data_list)):
res = result.get(timeout=5)
print("get result: {}".format(res))
mgr.shutdown()
print("master is shutdown")
worker端代码:
import time
from multiprocessing.managers import BaseManager
class TaskManager(BaseManager):
pass
#从网络上获取Queue,注册时只需要提供名字
TaskManager.register("get_task_queue")
TaskManager.register("get_result_queue")
mgr = TaskManager(address=("127.0.0.1", 4444), authkey=b"python")
mgr.connect()
task = mgr.get_task_queue()
result = mgr.get_result_queue()
while not task.empty():
data = task.get(timeout=5)
resp = data ** 2
time.sleep(1)
result.put(resp)
print("resp in worker: {}".format(resp))
print("worker exit")
先启动master,然后再启动worker
master输出如下:
put data in task: 1
put data in task: 2
put data in task: 3
put data in task: 4
put data in task: 5
put data in task: 6
get result: 1
get result: 4
get result: 9
get result: 16
get result: 25
get result: 36
master is shutdown
worker输出如下:
resp in worker: 1
resp in worker: 4
resp in worker: 9
resp in worker: 16
resp in worker: 25
resp in worker: 36
worker exit
这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,
从work的代码中可以看出并没有创建queue,queue是直接从网络获取的master的
而Queue
之所以能通过网络访问,就是通过QueueManager
实现的。由于QueueManager
管理的不止一个Queue
,所以,要给每个Queue
的网络调用接口起个名字,比如get_task_queue
。
2, 场景二:简单的任务分布式系统
master端代码:
master端产生任务,并将任务下发给worker。从worker收集处理结果
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
class JobSender(object):
def __init__(self, job_list):
self.job_list = job_list
self.task_list = [ data *2 for data in self.job_list]
class Master(object):
def __init__(self, job_list):
self.task_queue = Queue()
self.result_queue = Queue()
self.job_list = JobSender(job_list).task_list
def get_task_queue(self):
return self.task_queue
def get_result_queue(self):
return self.result_queue
def run(self):
BaseManager.register("tasks_queue", callable=self.get_task_queue)
BaseManager.register("results_queue", callable=self.get_result_queue)
mgr = BaseManager(address=('127.0.0.1', 4444), authkey=b'abc')
mgr.start()
tasks_list = mgr.tasks_queue()
results_list = mgr.results_queue()
while True:
for job in self.job_list:
tasks_list.put(job)
print("master put job: {}".format(job))
while not tasks_list.empty():
job_result = results_list.get(timeout=10)
print("master get result: {}".format(job_result))
mgr.shutdown()
if __name__ == "__main__":
job_list = [1, 2, 3, 4, 5]
master = Master(job_list)
master.run()
worker端代码:
worker端接受任务,并返回处理结果
import time
from multiprocessing.managers import BaseManager
class JobHandler(object):
def __init__(self, job):
self.job_data = job
def handler(self):
data = "value_" + str(self.job_data) + str(time.ctime())
return data
class Worker(object):
def run(self):
BaseManager.register("tasks_queue")
BaseManager.register("results_queue")
mgr = BaseManager(address=('127.0.0.1', 4444), authkey=b'abc')
mgr.connect()
tasks_list = mgr.tasks_queue()
results_list = mgr.results_queue()
# while True:
while not tasks_list.empty():
task = tasks_list.get(timeout=10)
print("worker get task: {}".format(task))
result = JobHandler(task).handler()
results_list.put(result)
print("worker put result: {}".format(result))
if __name__ == "__main__":
worker = Worker()
worker.run()
先启动master,然后再启动worker
master输出:
master put job: 2
master put job: 4
master put job: 6
master put job: 8
master put job: 10
master get result: value_2Wed Oct 24 22:53:09 2018
master get result: value_4Wed Oct 24 22:53:09 2018
master get result: value_6Wed Oct 24 22:53:09 2018
master get result: value_8Wed Oct 24 22:53:09 2018
master get result: value_10Wed Oct 24 22:53:09 2018
master put job: 2
master put job: 4
master put job: 6
master put job: 8
master put job: 10
worker输出:
worker get task: 2
worker put result: value_2Wed Oct 24 22:53:09 2018
worker get task: 4
worker put result: value_4Wed Oct 24 22:53:09 2018
worker get task: 6
worker put result: value_6Wed Oct 24 22:53:09 2018
worker get task: 8
worker put result: value_8Wed Oct 24 22:53:09 2018
worker get task: 10
worker put result: value_10Wed Oct 24 22:53:09 2018
参考文献:
https://zhuanlan.zhihu.com/p/32910840
http://blog.51cto.com/11026142/1874807
分布式进程--廖雪峰