IO模型
IO模型
IO指的是输入输出,输入输出都是一个耗时的操作,程序中一旦遇到了输入输出就会被阻塞,导致程序效率降低,IO模型也就是输入输出模型,是为了提高IO效率而出现。
IO本质上也分为不同类型,其中最典型的就是网络IO,由于网络速度比运算速度慢很多,所以大量的时间都是在等待网络IO,这也是我们要关注的重点!
copyData与waitData
网络通讯时,应用程序的数据是交由操作系统来进行发送的,同样接收数据时也是操作系统先收到消息。
为了更好的理解IO模型,需要先了解数据的接收和发送经历了哪些阶段过程。
1.发送数据时 send sendto
数据从应用程序内存copy到系统缓存,后续操作由操作系统完成,只需经历copydata阶段
import socket c = socket.socket() c.connect(("127.0.0.1",9898)) while True: data = input(":") if not data:continue c.send(data.encode("utf-8")) # 阻塞函数 速度较快 感觉不到阻塞
2.接收数据时 recv recvfrom accept
向操作系统发起读取操作后,必须要等待数据到达缓冲区,然后在从缓冲区copy到应用程序内存
所以接收数据 需要先经历waitData 再经历copyData
import socket s = socket.socket() s.bind(("127.0.0.1",9898)) s.listen() while True: c,addr = s.accept() # 阻塞 while True: data = c.recv(1024) # 阻塞 print(data.decode("utf-8"))
IO模型分类
阻塞IO
阻塞IO指的是程序一旦发起了相关的调用后,必须在阻塞在原地,等待IO操作结束后才能继续执行。
目前所学的所有TCP程序都属于阻塞IO模型(gevent除外),默认情况下socket提供的一系列方法都是阻塞的
如:recv send accept等,
需要强调的是:无论是什么样的IO模型都必须经历waitData和copyData,区别就在于对这两个阶段的处理方式不同。
阻塞IO具体流程如下:
大量的时间都耗费在等待waitData和 copyData上,而阻塞IO必须在原地等待,所以该模型的效率不高。
在TCP程序中使用该模型会明显感觉到效率低,一个客户端没有结束前,其他客户端是无法连接成功的。
多线程/多进程
在学习了线程和进程之后,我们可以将接受请求、收发数据拆分到不同线程中,来保证每一个客户端能够同时享受服务。
多线程虽然实现了并发访问,但是本质上并没有解决IO的阻塞问题,仅仅是把阻塞代码丢给另外一个线程,来避开了IO阻塞问题。
另一个问题是线程的创建时需要消耗系统资源的,所以我们不可能无限的去开启线程来处理客户端。
优点:解决了服务器不能并发处理客户端请求的问题
弊端:客户并发量太大将导致系统资源耗尽,并且没有解决阻塞问题
线程池
这就有了线程池,进程池,需要思考的是,线程池就一定比直接开线程效率高吗?
并不是,线程池主要功能是,限制线程的最大数量,保证服务器稳定运行,以及避免重复的创建和销毁线程,可以起到一些优化效果,但是对于IO效率是没有太大影响的。
优点:保证了服务器的稳定运行,减少频繁创建销毁线程的开销
弊端:当客户并发量高出系统承受线程数量极限时,后续的客户端将无法正常访问
上述解决方案都提高了效率,但是本质上还是属于阻塞IO模型。只是回避了IO阻塞问题。
并且由于GIL锁的存在,TCP程序中使用多线程不如单线程效率更高,但如何使得单线程可以并发处理多个客户端的请求呢,这便需要非阻塞IO了
非阻塞IO
非阻塞即 即时遇到IO操作不会进入阻塞状态。
例如:当发起了一个recv调用时,如果数据已经准备好了,就直接返回数据,如果没有准备好就返回错误信息,而recv函数将不会有任何阻塞效果,这样一来,就可以完全避开阻塞,在数据没有准备好的时候去执行其他任务,以此来提高效率。
非阻塞IO模型流程如下:
其中两个问题需要考虑:
1.如何使得socket变成非阻塞
socket.setblock(False)
2.如何获知数据没有准备好
捕获异常BlockingIOError
案例:
# 服务器 import socket import time s = socket.socket() s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind(("127.0.0.1",9898)) s.listen() s.setblocking(False) r_list = [] while True: time.sleep(0.1) try: c,addr = s.accept() r_list.append(c) except BlockingIOError: print("干点别的...",len(r_list)) for c in r_list: try: data = c.recv(1024) if not data: c.close() r_list.remove(c) print("断开连接........") continue print(data.decode("utf-8")) c.send(data.upper()) except BlockingIOError: continue except ConnectionResetError: c.close() r_list.remove(c) print("断开连接........")
#客户端: import socket import os c = socket.socket() c.connect(("127.0.0.1",9898)) while True: data = "%s hello" % os.getpid() if not data:continue c.send(data.encode("utf-8")) msg = c.recv(1024) print(msg.decode("utf-8"))
改进1:
上述代码可以完成单线程并发处理多个客户端,但是有一个影藏的bug,即在迭代期间操作容器元素,因为需要在客户端断开连接后从列表中删除客户端对象。
测试:
# 无法正确删除 li = [1,2,3,4,5] for i in li: print(i) li.remove(i) print(li) # 字典直接抛出异常 dic = {"name":"jack"} for k in dic: dic.pop(k) # 解决方案1:将要删除的元素存储到一个新列表中 遍历完成后在统一删除 li = [1,2,3,4,5] rm_list = [] for i in li[:]: rm_list.append(i) for i in rm_list: li.remove(i) print(li) # 解决方案2:遍历新列表 删除旧列表 li = [1,2,3,4,5] for i in li[:]: print(i) li.remove(i) print(li)
改进2:
思考c.send(data.upper())代码是不是阻塞的?
send是把数据交给操作系统缓存,也就是CopyData阶段,也是一个阻塞操作,而由于当前为非阻塞模式,在一些极端情况下可能会抛出BlockingIOError,例如缓冲区没有足够的容量时。以防万一,我们不能直接在recv下面发送数据,因为异常被捕获后直接执行了continue导致数据丢失。解决的方案把要发送的数据先存储到容器中统一发送。
import socket import time s = socket.socket() s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind(("127.0.0.1",9898)) s.listen() s.setblocking(False) r_list = [] w_list = [] while True: time.sleep(0.05) try: c,addr = s.accept() r_list.append(c) except BlockingIOError: print("干点别的...",len(r_list)) # 接收数据 for c in r_list: try: data = c.recv(1024) if not data: c.close() r_list.remove(c) print("断开连接........") continue print(data.decode("utf-8")) w_list.append((c,data.upper())) # 把要发送的数据存储到容器中 except BlockingIOError: continue except ConnectionResetError: c.close() r_list.remove(c) print("断开连接........") # 发送数据 for item in w_list[:]: try: item[0].send(item[1]) w_list.remove(item) except BlockingIOError: # 缓冲区不足 导致阻塞 continue except ConnectionResetError: # 客户端异常断开 item[0].close() w_list.remove(item) r_list.remove(item[0])
至此我们就基于非阻塞IO模型编写出了一个支持单线程并发的TCP程序,并且效率非常高,但是问题在于,改程序将导致CPU被大量的占用,并且很多时候是无效的占用,机试没有任何客户端需要处理也处于疯狂的循环中。因为要不断的去循环系统数据是否准备好。
IO多路复用
多路复用最也是要用单线程来处理客户端并发,与其他模型相比多出了select这个角色,
程序不再直接问系统要数据,而是先发起一个select调用,select会阻塞直到其中某个socket准备就绪,此时应用程序再发起系统调用来获取数据,由于select已经帮我们确认了某个socket一定是就绪了,所以后续的recv send等操作可以立即完成,不会阻塞。
简单的说,select相当于一个中间者,专门帮你看着socket,哪个socket准备好了select就返回哪个。
你可以把select当做托儿所,把你的socket交给它看管,当某个socket要上厕所或要吃饭时,select会把它交给你。
案例:
# 服务器 import socket import time import select s = socket.socket() s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) s.bind(("127.0.0.1",9898)) s.listen() s.setblocking(False) r_list = [s] w_list = [] datas = {} while True: reads,writes,_ = select.select(r_list,w_list,[]) # 处理可读的socket 即可以执行recv的 for i in reads: if i == s: c,addr = i.accept() r_list.append(c) else: try: data = i.recv(1024) if not data: r_list.remove(i) continue w_list.append(i) datas[i] = data.upper() except ConnectionResetError: r_list.remove(i) # 处理可写的 for i in writes: try: i.send(datas.pop(i)) except ConnectionResetError: i.close() datas.pop(i) r_list.remove(i) finally: w_list.remove(i)
#客户端 import socket import os import time c = socket.socket() c.connect(("127.0.0.1",9898)) while True: time.sleep(0.2) data = "%s hello" % os.getpid() if not data:continue c.send(data.encode("utf-8")) msg = c.recv(1024) print(msg.decode("utf-8"))
在Cpython中由于有GIL 所以 协程或者是 多路复用的效率都会高于线程或线程池。
异步IO
###