Netty实战 IM即时通讯系统(七)数据传输载体ByteBuf介绍

##

Netty实战 IM即时通讯系统(七)数据传输载体ByteBuf介绍

零、 目录

  1. IM系统简介
  • Netty 简介
  • Netty 环境配置
  • 服务端启动流程
  • 客户端启动流程
  • 实战: 客户端和服务端双向通信
  • 数据传输载体ByteBuf介绍
  • 客户端与服务端通信协议编解码
  • 实现客户端登录
  • 实现客户端与服务端收发消息
  • pipeline与channelHandler
  • 构建客户端与服务端pipeline
  • 拆包粘包理论与解决方案
  • channelHandler的生命周期
  • 使用channelHandler的热插拔实现客户端身份校验
  • 客户端互聊原理与实现
  • 群聊的发起与通知
  • 群聊的成员管理(加入与退出,获取成员列表)
  • 群聊消息的收发及Netty性能优化
  • 心跳与空闲检测
  • 总结
  • 扩展

七、 数据传输载体ByteBuf 介绍

  1. 前面的小节中我们了解到Netty的数据读写都是以ByteBuf 为单位进行交互的 , 我们接下来就剖析一下ByteBuf

  2. ByteBuf结构

    1. 首先我们来了解一下ByteBuf 结构Netty实战 IM即时通讯系统(七)数据传输载体ByteBuf介绍
      1. 以上就是一个ByteBuf 的结构图 , 从上面这幅图中可以看到:
        1. ByteBuf 是一个字节容器 , 容器里的数据分为三部分:
          1. 第一部分是已经丢弃的字节 , 这部分数据时无效的
          2. 第二部分是可读字节 , 这部分数据是ByteBuf 的主体数据 , 从ByteBuf里面读取的数据来自这一部分
          3. 最后一部分虚线表示的是该ByteBuf 最多还能扩容多少容量
        2. 以上三段内容是被两个指针给划分出来的 , 从左到右 , 依次是 读指针(readIndex) 、 写指针(writeIndex) , 然后还有一个变量capacity , 表示ByteBuf底层内存的总容量
        3. 从ByteBuf 中每读取一个字节 , readIndex自增1 , ByteBuf里面总共有writeIndez-readIndex个字节可读 , 由此可以得知 writeIndex = readIndex 时 , ByteBuf 不可读
        4. 写数据是从writeIndex 指向的部分开始写 , 每写一个字节writeIndex自增1 , 直到增大到capacity , 这个时候表示ByteBuff不可写了
        5. ByteBuf 里面其实还有一个参数maxCapacityv, 当向ByteBuf写数据的时候 , 如果容量不足 , 那么这个时候可以进行扩容 , 直到capacity扩容到macCapacity , 超过MaxCapacity 就会报错
      2. Netty使用这个数据结构可以有效的区分可读数据和可写数据 , 读写之间相互没有冲突 , 当然 , ByteBuf 只是对二进制数据的抽象 , 具体底层实现我们在下面的小节讲到 , 这一小节我们只要知道Netty关于数据读写之人ByteBuf , 下面我们就来学习一下BuyteBuf 的常用API
  3. Api

    1. 容量API
      1. capacity(): 表示ByteBuf底层占用了多少字节的内存(包括 丢弃的字节 , 可读的字节 , 可写的字节) , 不同的底层实现由不同的算法机制 , 后面我们将|ByteBuf 分类的时候会讲到
      2. maxCapacity():表示ByteBuf 最大能占用多少字节的内存 , 当向ByteBuf写数据时 , 如果发现容量不够会进行扩容 , 直到扩容到macCapacity , 超过这个数 , 就抛异常。
      3. readableBytes() 与 isReadable():readableBytes()返回可读的字节数 , 他的值等于writeIndex - readIndex , 如果两者相等 , 则不可读 , 这时isReadable()返回false
      4. writableBytes()、 isWritable() 与 maxWritableBytes() : writeable() 返回当前可写的字节数 , 他的值等于 capacity - writeable 如果两者相等 , 则表示不可写 , 这个时候isWriteable()返回为false , 但是这个时候并代表不能忘ByteBuf 中填充数据了 , 如果发现往ByteBuf 中写数据写不进去的话 , Netty会自动扩容ByteBuf , 直到扩容到底层的内存大小为maxCapacity , 而maxCapacity()就表示可写的最大字节数 , 他的值就等于maxCapacity
    2. 读写指针相关的API
      1. readerIndex() 和 readerIndex(int): 前者表示获取当前读指针位置 , 后者表示设置读指针位置

      2. writeIndex() 与 writeIndex(int): 前者表示获取当前写指针位置 , 后者表示设置写指针位置

      3. markReaderIndex() 与 resetReaderIndex(): 前者表示把当前的读指针保存起来 , 后者表示 把当前的读指针恢复到之前保存的值

         // 代码片段1
         int readerIndex = buffer.readerIndex();
         // .. 其他操作
         buffer.readerIndex(readerIndex);
         
         
         // 代码片段二
         buffer.markReaderIndex();
         // .. 其他操作
         buffer.resetReaderIndex();
        
        1. 推荐使用代码片段二这种方式 , 不需要自己定义变量
    3. 读写API
      1. writeBytes(byte[] src) 与 buffer.readBytes(byte[] dst) :writeBytes(bs) 表示把字节数组bs中的字节全部写到ByteBuf , 而readBytes()指的是把ByteBuf里的数据全部读取到dst , 这里dst的字节数组的大小通常等于readableBytes() , 而src的长度通常小于writeableBytes();

      2. writeByte(byte b) 与 readByte():writeByte()表示往ByteBuf中写入一个字节 , 类似的API还有writeBoolean() , writeChar() , writeShort()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 这里就不一一赘述了,相信读者应该很容易理解这些 API 。 与读写 API 类似的 API 还有 getBytes、getByte() 与 setBytes()、setByte() 系列,唯一的区别就是 get/set 不会改变读写指针,而 read/write 会改变读写指针,这点在解析数据的时候千万要注意

      3. release() 与 retain(): 由于 Netty 使用了堆外内存,而堆外内存是不被 jvm 直接管理的,也就是说申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。有点类似于c语言里面,申请到的内存必须手工释放,否则会造成内存泄漏。Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,需要回收底层内存。默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一, release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。

      4. slice()、duplicate()、copy():这三个方法通常情况会放到一起比较,这三者的返回值都是一个新的 ByteBuf 对象

        1. slice() 方法从原始 ByteBuf 中截取一段,这段数据是从 readerIndex 到 writeIndex,同时,返回的新的 ByteBuf 的最大容量 maxCapacity 为原始 ByteBuf 的 readableBytes()
        2. duplicate() 方法把整个 ByteBuf 都截取出来,包括所有的数据,指针信息
        3. slice() 方法与 duplicate() 方法的相同点是:底层内存以及引用计数与原始的 ByteBuf 共享,也就是说经过 slice() 或者 duplicate() 返回的 ByteBuf 调用 write 系列方法都会影响到 原始的 ByteBuf,但是它们都维持着与原始 ByteBuf 相同的内存引用计数和不同的读写指针
        4. slice() 方法与 duplicate() 不同点就是:slice() 只截取从 readerIndex 到 writerIndex 之间的数据,它返回的 ByteBuf 的最大容量被限制到 原始 ByteBuf 的 readableBytes(), 而 duplicate() 是把整个 ByteBuf 都与原始的 ByteBuf 共享
        5. slice() 方法与 duplicate() 方法不会拷贝数据,它们只是通过改变读写指针来改变读写的行为,而最后一个方法 copy() 会直接从原始的 ByteBuf 中拷贝所有的信息,包括读写指针以及底层对应的数据,因此,往 copy() 返回的 ByteBuf 中写数据不会影响到原始的 ByteBuf
        6. slice() 和 duplicate() 不会改变 ByteBuf 的引用计数,所以原始的 ByteBuf 调用 release() 之后发现引用计数为零,就开始释放内存,调用这两个方法返回的 ByteBuf 也会被释放,这个时候如果再对它们进行读写,就会报错。因此,我们可以通过调用一次 retain() 方法 来增加引用,表示它们对应的底层的内存多了一次引用,引用计数为2,在释放内存的时候,需要调用两次 release() 方法,将引用计数降到零,才会释放内存
        7. 这三个方法均维护着自己的读写指针,与原始的 ByteBuf 的读写指针无关,相互之间不受影响
      5. retainedSlice() 与 retainedDuplicate():相信读者应该已经猜到这两个 API 的作用了,它们的作用是在截取内存片段的同时,增加内存的引用计数,分别与下面两段代码等价

         // retainedSlice 等价于
         slice().retain();
         
         // retainedDuplicate() 等价于
        
        1. 使用到 slice 和 duplicate 方法的时候,千万要理清内存共享,引用计数共享,读写指针不共享几个概念,下面举两个常见的易犯错的例子

               多次释放
           
           Buffer buffer = xxx;
           doWith(buffer);
           // 一次释放
           buffer.release();
           
           
           public void doWith(Bytebuf buffer) {
           // ...    
               
           // 没有增加引用计数
           Buffer slice = buffer.slice();
           
           foo(slice);
           
           }
           
           
           public void foo(ByteBuf buffer) {
               // read from buffer
               
               // 重复释放
               buffer.release();
           }
           
           这里的 doWith 有的时候是用户自定义的方法,有的时候是 Netty 的回调方法,比如 channelRead() 等等
           
               不释放造成内存泄漏
           
           Buffer buffer = xxx;
           doWith(buffer);
           // 引用计数为2,调用 release 方法之后,引用计数为1,无法释放内存 
           buffer.release();
           
           
           public void doWith(Bytebuf buffer) {
           // ...    
               
           // 增加引用计数
           Buffer slice = buffer.retainedSlice();
           
           foo(slice);
           
           // 没有调用 release
           
           }
           
           
           public void foo(ByteBuf buffer) {
               // read from buffer
           }
          
  4. 实战:

    1. 代码

       public class Test_08_ByteBuf介绍 {
       
       	public static void main(String[] args) {
       		ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 200);
       		print("allocate ByteBuf(9, 100)", buffer);
       
       		// write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写
       		buffer.writeBytes(new byte[] { 1, 2, 3, 4 });
       		print("writeBytes(1,2,3,4)", buffer);
       
       		// write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写, 写完 int 类型之后,写指针增加4
       		buffer.writeInt(10);
       		print("writeInt(10)", buffer);
       
       		// write 方法改变写指针, 写完之后写指针等于 capacity 的时候,buffer 不可写
       		buffer.writeBytes(new byte[] { 5 });
       		print("writeBytes(5)", buffer);
       
       		// write 方法改变写指针,写的时候发现 buffer 不可写则开始扩容,扩容之后 capacity 随即改变
       		buffer.writeBytes(new byte[] { 6 });
       		print("writeBytes(6)", buffer);
       
       		// get 方法不改变读写指针
       		System.out.println("getByte(3) return: " + buffer.getByte(3));
       		System.out.println("getShort(3) return: " + buffer.getShort(3));
       		System.out.println("getInt(3) return: " + buffer.getInt(3));
       		print("getByte()", buffer);
       
       		// set 方法不改变读写指针
       		buffer.setByte(buffer.readerIndex() + buffer.readableBytes() + 1, 0);
       		print("setByte()", buffer);
       
       		// read 方法改变读指针
       		byte[] dst = new byte[buffer.readableBytes()];
       		buffer.readBytes(dst);
       		print("readBytes(" + dst.length + ")", buffer);
       	}
       
       	private static void print(String action, ByteBuf buffer) {
       		System.out.println("after ===========" + action + "============");
       		System.out.println("capacity(): " + buffer.capacity());
       		System.out.println("maxCapacity(): " + buffer.maxCapacity());
       		System.out.println("readerIndex(): " + buffer.readerIndex());
       		System.out.println("readableBytes(): " + buffer.readableBytes());
       		System.out.println("isReadable(): " + buffer.isReadable());
       		System.out.println("writerIndex(): " + buffer.writerIndex());
       		System.out.println("writableBytes(): " + buffer.writableBytes());
       		System.out.println("isWritable(): " + buffer.isWritable());
       		System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());
       		System.out.println();
       	}
       }
       
       执行结果: 
      
       after ===========allocate ByteBuf(9, 100)============
       capacity(): 9
       maxCapacity(): 200
       readerIndex(): 0
       readableBytes(): 0
       isReadable(): false
       writerIndex(): 0
       writableBytes(): 9
       isWritable(): true
       maxWritableBytes(): 200
       
       after ===========writeBytes(1,2,3,4)============
       capacity(): 9
       maxCapacity(): 200
       readerIndex(): 0
       readableBytes(): 4
       isReadable(): true
       writerIndex(): 4
       writableBytes(): 5
       isWritable(): true
       maxWritableBytes(): 196
       
       after ===========writeInt(10)============
       capacity(): 9
       maxCapacity(): 200
       readerIndex(): 0
       readableBytes(): 8
       isReadable(): true
       writerIndex(): 8
       writableBytes(): 1
       isWritable(): true
       maxWritableBytes(): 192
       
       after ===========writeBytes(5)============
       capacity(): 9
       maxCapacity(): 200
       readerIndex(): 0
       readableBytes(): 9
       isReadable(): true
       writerIndex(): 9
       writableBytes(): 0
       isWritable(): false
       maxWritableBytes(): 191
       
       after ===========writeBytes(6)============
       capacity(): 64
       maxCapacity(): 200
       readerIndex(): 0
       readableBytes(): 10
       isReadable(): true
       writerIndex(): 10
       writableBytes(): 54
       isWritable(): true
       maxWritableBytes(): 190
       
       getByte(3) return: 4
       getShort(3) return: 1024
       getInt(3) return: 67108864
       after ===========getByte()============
       capacity(): 64
       maxCapacity(): 200
       readerIndex(): 0
       readableBytes(): 10
       isReadable(): true
       writerIndex(): 10
       writableBytes(): 54
       isWritable(): true
       maxWritableBytes(): 190
       
       after ===========setByte()============
       capacity(): 64
       maxCapacity(): 200
       readerIndex(): 0
       readableBytes(): 10
       isReadable(): true
       writerIndex(): 10
       writableBytes(): 54
       isWritable(): true
       maxWritableBytes(): 190
       
       after ===========readBytes(10)============
       capacity(): 64
       maxCapacity(): 200
       readerIndex(): 10
       readableBytes(): 0
       isReadable(): false
       writerIndex(): 10
       writableBytes(): 54
       isWritable(): true
       maxWritableBytes(): 190
      
  5. 总结

  6. 本小节 , 我们分析了Netty对二进制数据的抽象ByteBuf结构 , 本质上他的原理就是 , 他引用了一段内存 , 这段内存可以是对内的也可以是堆外的 , 然后引用计数来控制内存是否需要被释放 , 使用读写指针来控制ByteBuf的读写 , 可以理解为是外观模式的一种使用

  7. 基于读写指针和容量、最大可扩容容量,衍生出一系列的读写方法,要注意 read/write 与 get/set 的区别

  8. 多个 ByteBuf 可以引用同一段内存,通过引用计数来控制内存的释放,遵循谁 retain() 谁 release() 的原则

  9. 最后,我们通过一个具体的例子说明 ByteBuf 的实际使用

  10. 思考:

    1. slice 方法可能用在什么场景?欢迎留言讨论。
    2. 在哪种场景下需要我们调用retain()去增加引用计数呢?
      1. 比如,你抽象出来的一个方法,这个功能就是把bytebuf转换成一个对象,然后release,如果你想调用这个方法之后还想继续读数据,那么久需要在调用这个方法前 retain一下
    3. ByteBuf引用的内存也可以是堆内的吗?怎么指定堆内?
      1. 分配内存的时候可以调用分配堆内存的方法,ByteBufAllocator.heapBuffer() , r如果使用了堆内内存 , 则不需要手动释放
    4. 扩容每次扩多少?
      1. 从64B开始,指数扩容,直到能装下为止
    5. 为什么getshort(3) 是 1024?
      1. 执行了buffer.writeBytes(new byte[]{1, 2, 3, 4});后,往buffer里写了4个byte,再执行buffer.writeInt(12);后,因为int长度为4 bytes,所以又往buffer里写了4个byte,总共写入8个byte。而getshort(3)是从第4个byte开始,一共读取2个byte(即第4和第5个),其二进制表示为 0000 0100 0000 0000,变成十进制就是1024