IO模型

IO模型

IO指的是输入输出,输入输出都是一个耗时的操作,程序中一旦遇到了输入输出就会被阻塞,导致程序效率降低,IO模型也就是输入输出模型,是为了提高IO效率而出现。

IO本质上也分为不同类型,其中最典型的就是网络IO,由于网络速度比运算速度慢很多,所以大量的时间都是在等待网络IO,这也是我们要关注的重点!

copyData与waitData

网络通讯时,应用程序的数据是交由操作系统来进行发送的,同样接收数据时也是操作系统先收到消息。

IO模型

为了更好的理解IO模型,需要先了解数据的接收和发送经历了哪些阶段过程。

1.发送数据时 send sendto

数据从应用程序内存copy到系统缓存,后续操作由操作系统完成,只需经历copydata阶段

import socket
​
c = socket.socket()
c.connect(("127.0.0.1",9898))
​
while True:
    data = input(":")
    if not data:continue
    c.send(data.encode("utf-8")) # 阻塞函数 速度较快 感觉不到阻塞

 

2.接收数据时 recv recvfrom accept

向操作系统发起读取操作后,必须要等待数据到达缓冲区,然后在从缓冲区copy到应用程序内存

所以接收数据 需要先经历waitData 再经历copyData

import socket
​
s = socket.socket()
s.bind(("127.0.0.1",9898))
s.listen()
​
while True:
    c,addr = s.accept() # 阻塞
    while True:
        data = c.recv(1024)  # 阻塞
        print(data.decode("utf-8"))

 

 

IO模型分类

阻塞IO

阻塞IO指的是程序一旦发起了相关的调用后,必须在阻塞在原地,等待IO操作结束后才能继续执行。

目前所学的所有TCP程序都属于阻塞IO模型(gevent除外),默认情况下socket提供的一系列方法都是阻塞的

如:recv send accept等,

需要强调的是:无论是什么样的IO模型都必须经历waitData和copyData,区别就在于对这两个阶段的处理方式不同。

阻塞IO具体流程如下: IO模型

 

大量的时间都耗费在等待waitData和 copyData上,而阻塞IO必须在原地等待,所以该模型的效率不高。

在TCP程序中使用该模型会明显感觉到效率低,一个客户端没有结束前,其他客户端是无法连接成功的。

多线程/多进程

在学习了线程和进程之后,我们可以将接受请求、收发数据拆分到不同线程中,来保证每一个客户端能够同时享受服务。

多线程虽然实现了并发访问,但是本质上并没有解决IO的阻塞问题,仅仅是把阻塞代码丢给另外一个线程,来避开了IO阻塞问题。

另一个问题是线程的创建时需要消耗系统资源的,所以我们不可能无限的去开启线程来处理客户端。

优点:解决了服务器不能并发处理客户端请求的问题

弊端:客户并发量太大将导致系统资源耗尽,并且没有解决阻塞问题

线程池

这就有了线程池,进程池,需要思考的是,线程池就一定比直接开线程效率高吗?

并不是,线程池主要功能是,限制线程的最大数量,保证服务器稳定运行,以及避免重复的创建和销毁线程,可以起到一些优化效果,但是对于IO效率是没有太大影响的。

优点:保证了服务器的稳定运行,减少频繁创建销毁线程的开销

弊端:当客户并发量高出系统承受线程数量极限时,后续的客户端将无法正常访问

 

上述解决方案都提高了效率,但是本质上还是属于阻塞IO模型。只是回避了IO阻塞问题。

并且由于GIL锁的存在,TCP程序中使用多线程不如单线程效率更高,但如何使得单线程可以并发处理多个客户端的请求呢,这便需要非阻塞IO了

 

非阻塞IO

非阻塞即 即时遇到IO操作不会进入阻塞状态。

例如:当发起了一个recv调用时,如果数据已经准备好了,就直接返回数据,如果没有准备好就返回错误信息,而recv函数将不会有任何阻塞效果,这样一来,就可以完全避开阻塞,在数据没有准备好的时候去执行其他任务,以此来提高效率。

非阻塞IO模型流程如下:

IO模型

其中两个问题需要考虑:

1.如何使得socket变成非阻塞

socket.setblock(False)

2.如何获知数据没有准备好

捕获异常BlockingIOError

案例:

# 服务器
import socket
import time
​
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind(("127.0.0.1",9898))
s.listen()
s.setblocking(False)
​
r_list = []
while True:
    time.sleep(0.1)
    try:
        c,addr = s.accept()
        r_list.append(c)
    except BlockingIOError:
        print("干点别的...",len(r_list))
        for c in r_list:
            try:
                data = c.recv(1024)
                if not data:
                    c.close()
                    r_list.remove(c)
                    print("断开连接........")
                    continue
                print(data.decode("utf-8"))
                c.send(data.upper())
            except BlockingIOError:
                continue
            except ConnectionResetError:
                c.close()
                r_list.remove(c)
                print("断开连接........")
#客户端:
import socket
import os
c = socket.socket()
c.connect(("127.0.0.1",9898))
​
while True:
    data = "%s hello" % os.getpid()
    if not data:continue
    c.send(data.encode("utf-8"))
    msg = c.recv(1024)
    print(msg.decode("utf-8"))

改进1:

上述代码可以完成单线程并发处理多个客户端,但是有一个影藏的bug,即在迭代期间操作容器元素,因为需要在客户端断开连接后从列表中删除客户端对象。

测试:

# 无法正确删除
li = [1,2,3,4,5]
for i in li:
    print(i)
    li.remove(i)
print(li)
​
# 字典直接抛出异常
dic = {"name":"jack"}
for k in dic:
    dic.pop(k)
​
​
# 解决方案1:将要删除的元素存储到一个新列表中 遍历完成后在统一删除
li = [1,2,3,4,5]
rm_list = []
for i in li[:]:
    rm_list.append(i)
​
for i in rm_list:
    li.remove(i)
print(li)
​
# 解决方案2:遍历新列表 删除旧列表
li = [1,2,3,4,5]
for i in li[:]:
    print(i)
    li.remove(i)
print(li)

改进2:

思考c.send(data.upper())代码是不是阻塞的?

send是把数据交给操作系统缓存,也就是CopyData阶段,也是一个阻塞操作,而由于当前为非阻塞模式,在一些极端情况下可能会抛出BlockingIOError,例如缓冲区没有足够的容量时。以防万一,我们不能直接在recv下面发送数据,因为异常被捕获后直接执行了continue导致数据丢失。解决的方案把要发送的数据先存储到容器中统一发送。

import socket
import time
​
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind(("127.0.0.1",9898))
s.listen()
s.setblocking(False)
​
r_list = []
w_list = []
while True:
    time.sleep(0.05)
    try:
        c,addr = s.accept()
        r_list.append(c)
    except BlockingIOError:
        print("干点别的...",len(r_list))
​
​
        #  接收数据
        for c in r_list:
            try:
                data = c.recv(1024)
                if not data:
                    c.close()
                    r_list.remove(c)
                    print("断开连接........")
                    continue
                print(data.decode("utf-8"))
                w_list.append((c,data.upper())) # 把要发送的数据存储到容器中
            except BlockingIOError:
                continue
            except ConnectionResetError:
                c.close()
                r_list.remove(c)
                print("断开连接........")
​
        # 发送数据
        for item in w_list[:]:
            try:
                item[0].send(item[1])
                w_list.remove(item)
            except BlockingIOError: # 缓冲区不足 导致阻塞
                continue
            except ConnectionResetError: # 客户端异常断开
                item[0].close()
                w_list.remove(item)
                r_list.remove(item[0])

至此我们就基于非阻塞IO模型编写出了一个支持单线程并发的TCP程序,并且效率非常高,但是问题在于,改程序将导致CPU被大量的占用,并且很多时候是无效的占用,机试没有任何客户端需要处理也处于疯狂的循环中。因为要不断的去循环系统数据是否准备好。

 

IO多路复用

多路复用最也是要用单线程来处理客户端并发,与其他模型相比多出了select这个角色,

程序不再直接问系统要数据,而是先发起一个select调用,select会阻塞直到其中某个socket准备就绪,此时应用程序再发起系统调用来获取数据,由于select已经帮我们确认了某个socket一定是就绪了,所以后续的recv send等操作可以立即完成,不会阻塞。

简单的说,select相当于一个中间者,专门帮你看着socket,哪个socket准备好了select就返回哪个。

你可以把select当做托儿所,把你的socket交给它看管,当某个socket要上厕所或要吃饭时,select会把它交给你。

IO模型

案例:

# 服务器
import socket
import time
import select
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind(("127.0.0.1",9898))
s.listen()
s.setblocking(False)
​
r_list = [s]
w_list = []
datas = {}
​
while True:
    reads,writes,_ = select.select(r_list,w_list,[])
    # 处理可读的socket 即可以执行recv的
    for i in reads:
        if i == s:
            c,addr = i.accept()
            r_list.append(c)
        else:
            try:
                data = i.recv(1024)
                if not data:
                    r_list.remove(i)
                    continue
​
                w_list.append(i)
                datas[i] = data.upper()
            except ConnectionResetError:
                r_list.remove(i)
​
    # 处理可写的
    for i in writes:
        try:
            i.send(datas.pop(i))
        except ConnectionResetError:
            i.close()
            datas.pop(i)
            r_list.remove(i)
        finally:
            w_list.remove(i)
#客户端
import socket
import os
import time
c = socket.socket()
c.connect(("127.0.0.1",9898))
​
while True:
    time.sleep(0.2)
    data = "%s hello" % os.getpid()
    if not data:continue
    c.send(data.encode("utf-8"))
    msg = c.recv(1024)
    print(msg.decode("utf-8"))

 

在Cpython中由于有GIL 所以 协程或者是 多路复用的效率都会高于线程或线程池。

 

异步IO

 

IO模型

###