进程间通讯方式

1、管道通讯:Pipe

(1)原理

  • 在内存中开辟一块空间,形成管道结构,管道对多个进程可见,进程可以通过对管道的读写操作进行通信

(2)流程

  1. 创建管道
from multiprocessing import Pipe

# 创建管道
fd1, fd2 = Pipe()

     功能:创建一个管道
     参数:默认表示管道为双向管道
               如果设置为False,则表示为单向通道
     返回值:返回两个管道流对象,表示管道两端
                    如果是双向管道,则都可以读写
                    如果是单向管道,则fd1只读,fd2只写

  1. 从管道内读信息
fd1.recv()

     功能:从管道内读信息
     返回值:读取的内容
                    当管道内无内容时会阻塞

  1. 向管道写入数据
fd2.send()

     功能:向管道内写入内容
     参数:要写的内容
               几乎可以发送python的任意数据类型

(3)示例演示

  • 子进程向管道写数据,主进程从管道往出读数据
from multiprocessing import Process, Pipe
import os,time

#创建管道
fd1, fd2 = Pipe()

def fun(name):
	time.sleep(3)
	#向管道内写入内容
	fd2.send('hello'+str(name))

joins = []
for i in range(5):
	p = Process(target=fun, args=(i,))
	joins.append(p)
	p.start()


#读取管道内消息
for i in range(5):
	data = fd1.recv()
	print(data)

# 进程回收
for i in joins:
	i.join()

运行结果

[email protected]-virtual-machine:~/Process_mode$ python3 pool.py 
hello0
hello1
hello2
hello4
hello3

2、消息队列:Queue

(1)原理

  • 队列:先进先出
  • 在内存中开辟队列结构空间,对多个进程可见。多个进程向对列中存入消息,读出消息,完成进程间通讯

(2)流程

  1. 创建队列
from multiprocessing import Queue

q = Queue(maxsize=0)

     参数:maxsize默认表示根据系统分配空间存储消息,如果传入一个整数则表示最多存入消息数量
     返回值:队列对象

  1. 向队列中存入消息
q.put(data,[block,timeout])

     功能:向队列中存入消息
     参数:data:存入的数据(支持python数据类型)
                block:默认为True表示当队列满时阻塞
                            设置为False表示非阻塞
     timeout:当block为True时表示超时时间

  1. 从队列中读取消息
data=q.get([block,timeout])

     功能:从队列获取消息
     参数:block:默认为True表示当队列满时阻塞
                            设置为False表示非阻塞
     timeout:当block为True时表示超时时间
     返回值:返回获取到的消息

(3)队列对象属性

  1. q.full():判断队列是否为满
  2. q.empty():判断队列是否为空
  3. q.qsize():获取队列中消息数量
  4. q.close():关闭队列

(4)示例演示

from multiprocessing import Process, Queue
import time


#创建消息队列
q = Queue()

def fun1():
	time.sleep(1)
	q.put("我是进程1")

def fun2():
	time.sleep(2)
	print("收到消息:",q.get())


p1 = Process(target=fun1)
p2 = Process(target=fun2)

p1.start()
p2.start()

p1.join()
p1.join()

运行结果

[email protected]-virtual-machine:~/Process_mode$ python3 pool.py 
收到消息: 我是进程1

3、共享内存

(1)原理

  • 在内存中开辟一段空间,存储数据,对多个进程可见,每次写入共享内存的数据会覆盖之前的内容(所以只能存一条消息)

(2)流程

  1. 创建对象
from multiprocessing import Value

obj = Value(ctype, obj)

     功能:开辟共享内存空间
     参数:ctype:字符串,要转变的c的类型code(对照ctype表)
                obj:共享内存初始值
     返回值:返回共享内存对象

  1. 通过obj.value访问内存对象

(3)代码示例

from multiprocessing import	Process, Value
import time
import random


# 共享内存对象初始存放2000
money = Value('i',2000)

# 存钱进程
def deposite():
	for i in range(100):
		time.sleep(0.05)
		# 对value操作实际就是操作共享内存
		money.value += random.randint(1, 200)
		
# 取钱进程
def withdraw():
	for i in range(100):
		time.sleep(0.04)
		# 对value操作实际就是操作共享内存
		money.value -= random.randint(1, 200)


d=Process(target=deposite)
w=Process(target=withdraw)

d.start()
w.start()

d.join()
w.join()

print(money.value)
  1. 创建对象
from multiprocessing import Array

obj = Array(ctype, obj)

功能:开辟共享内存空间
参数:ctype:要转换的类型
           obj:要存入共享内存的数据
                列表:将列表存入共享内存 ,要求类型一致
                整数:在共享内存中开辟几个单元的空间

  1. 通过索引下标访问内存对象
from multiprocessing import	Process, Array
import time

# 创建共享内存,列表是共享内存的初始值
# shm = Array('i',[1,2,3,4,5])

# 共享内存中开辟出5个空间
shm = Array('i', 5)

def fun():
	for i in shm:
		print(i)
	# 修改共享内存内容
	shm[3] = 1000

p = Process(target=fun)

p.start()
p.join()

for i in shm:
	print(i)

进程间通讯方式

4、进程池中的Queue

  • 如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()
# -*- coding:utf-8 -*-

# 修改import中的Queue为Manager
from multiprocessing import Manager, Pool
import os, time, random

def reader(q):
    print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    # 获取队列中消息数量
    for i in range(q.qsize()):
    	# 当队列消息数量满时阻塞
        print("reader从Queue获取到消息:%s" % q.get(True))

def writer(q):
    print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in "abcdefg":
        q.put(i)

if __name__=="__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    po.apply_async(writer, (q,))

    time.sleep(1)  # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())

运行结果:

[email protected]-virtual-machine:~/Process_mode$ python3 pool.py 
(17908) start
writer启动(17914),父进程为(17908)
reader启动(17915),父进程为(17908)
reader从Queue获取到消息:a
reader从Queue获取到消息:b
reader从Queue获取到消息:c
reader从Queue获取到消息:d
reader从Queue获取到消息:e
reader从Queue获取到消息:f
reader从Queue获取到消息:g
(17908) End