进程间通讯方式
1、管道通讯:Pipe
(1)原理
- 在内存中开辟一块空间,形成管道结构,管道对多个进程可见,进程可以通过对管道的读写操作进行通信
(2)流程
- 创建管道
from multiprocessing import Pipe
# 创建管道
fd1, fd2 = Pipe()
功能:创建一个管道
参数:默认表示管道为双向管道
如果设置为False
,则表示为单向通道
返回值:返回两个管道流对象,表示管道两端
如果是双向管道,则都可以读写
如果是单向管道,则fd1只读,fd2只写
- 从管道内读信息
fd1.recv()
功能:从管道内读信息
返回值:读取的内容
当管道内无内容时会阻塞
- 向管道写入数据
fd2.send()
功能:向管道内写入内容
参数:要写的内容
几乎可以发送python的任意数据类型
(3)示例演示
- 子进程向管道写数据,主进程从管道往出读数据
from multiprocessing import Process, Pipe
import os,time
#创建管道
fd1, fd2 = Pipe()
def fun(name):
time.sleep(3)
#向管道内写入内容
fd2.send('hello'+str(name))
joins = []
for i in range(5):
p = Process(target=fun, args=(i,))
joins.append(p)
p.start()
#读取管道内消息
for i in range(5):
data = fd1.recv()
print(data)
# 进程回收
for i in joins:
i.join()
运行结果
[email protected]-virtual-machine:~/Process_mode$ python3 pool.py
hello0
hello1
hello2
hello4
hello3
2、消息队列:Queue
(1)原理
- 队列:先进先出
- 在内存中开辟队列结构空间,对多个进程可见。多个进程向对列中存入消息,读出消息,完成进程间通讯
(2)流程
- 创建队列
from multiprocessing import Queue
q = Queue(maxsize=0)
参数:maxsize默认表示根据系统分配空间存储消息,如果传入一个整数则表示最多存入消息数量
返回值:队列对象
- 向队列中存入消息
q.put(data,[block,timeout])
功能:向队列中存入消息
参数:data:存入的数据(支持python数据类型)
block:默认为True表示当队列满时阻塞
设置为False表示非阻塞
timeout:当block为True时表示超时时间
- 从队列中读取消息
data=q.get([block,timeout])
功能:从队列获取消息
参数:block:默认为True表示当队列满时阻塞
设置为False表示非阻塞
timeout:当block为True时表示超时时间
返回值:返回获取到的消息
(3)队列对象属性
-
q.full()
:判断队列是否为满 -
q.empty()
:判断队列是否为空 -
q.qsize()
:获取队列中消息数量 -
q.close()
:关闭队列
(4)示例演示
from multiprocessing import Process, Queue
import time
#创建消息队列
q = Queue()
def fun1():
time.sleep(1)
q.put("我是进程1")
def fun2():
time.sleep(2)
print("收到消息:",q.get())
p1 = Process(target=fun1)
p2 = Process(target=fun2)
p1.start()
p2.start()
p1.join()
p1.join()
运行结果
[email protected]-virtual-machine:~/Process_mode$ python3 pool.py
收到消息: 我是进程1
3、共享内存
(1)原理
- 在内存中开辟一段空间,存储数据,对多个进程可见,每次写入共享内存的数据会覆盖之前的内容(所以只能存一条消息)
(2)流程
- 创建对象
from multiprocessing import Value
obj = Value(ctype, obj)
功能:开辟共享内存空间
参数:ctype:字符串,要转变的c的类型code(对照ctype表)
obj:共享内存初始值
返回值:返回共享内存对象
- 通过
obj.value
访问内存对象
(3)代码示例
from multiprocessing import Process, Value
import time
import random
# 共享内存对象初始存放2000
money = Value('i',2000)
# 存钱进程
def deposite():
for i in range(100):
time.sleep(0.05)
# 对value操作实际就是操作共享内存
money.value += random.randint(1, 200)
# 取钱进程
def withdraw():
for i in range(100):
time.sleep(0.04)
# 对value操作实际就是操作共享内存
money.value -= random.randint(1, 200)
d=Process(target=deposite)
w=Process(target=withdraw)
d.start()
w.start()
d.join()
w.join()
print(money.value)
- 创建对象
from multiprocessing import Array
obj = Array(ctype, obj)
功能:开辟共享内存空间
参数:ctype:要转换的类型
obj:要存入共享内存的数据
列表
:将列表存入共享内存 ,要求类型一致
整数
:在共享内存中开辟几个单元的空间
- 通过索引下标访问内存对象
from multiprocessing import Process, Array
import time
# 创建共享内存,列表是共享内存的初始值
# shm = Array('i',[1,2,3,4,5])
# 共享内存中开辟出5个空间
shm = Array('i', 5)
def fun():
for i in shm:
print(i)
# 修改共享内存内容
shm[3] = 1000
p = Process(target=fun)
p.start()
p.join()
for i in shm:
print(i)
4、进程池中的Queue
- 如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()
# -*- coding:utf-8 -*-
# 修改import中的Queue为Manager
from multiprocessing import Manager, Pool
import os, time, random
def reader(q):
print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
# 获取队列中消息数量
for i in range(q.qsize()):
# 当队列消息数量满时阻塞
print("reader从Queue获取到消息:%s" % q.get(True))
def writer(q):
print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in "abcdefg":
q.put(i)
if __name__=="__main__":
print("(%s) start" % os.getpid())
q = Manager().Queue() # 使用Manager中的Queue
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据
po.apply_async(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
运行结果:
[email protected]-virtual-machine:~/Process_mode$ python3 pool.py
(17908) start
writer启动(17914),父进程为(17908)
reader启动(17915),父进程为(17908)
reader从Queue获取到消息:a
reader从Queue获取到消息:b
reader从Queue获取到消息:c
reader从Queue获取到消息:d
reader从Queue获取到消息:e
reader从Queue获取到消息:f
reader从Queue获取到消息:g
(17908) End