Netty——ByteBuf学习1

Java原生NIO库中的ByteBuffer存在的一些缺点,导致在使用时不是很方面,例如:

  1. ByteBuffer中只有一个指针,用户在读取时需要调用flip()等操作移动指针的位置,否则很容易导致读取出的数据不正确
  2. ByteBuffer在put操作时不能自动扩充容量,即只能在创建时就确定,在后续的使用过程中不太灵活

Netty实现了自己的缓冲区类——ByteBuf

其中,使用了两个指针,一个是读指针——readerIndex,一个是写指针——writerIndex。两个指针之间互不干扰。通过这两个指针,可以将整个缓冲区划分成三个区域,0-readerIndex指的是已经读取完数据的区域,这部分区域可以复用;readerIndex-writerIndex指的是可读区域;writerIndex-capacity指的是可写区域。因为对缓冲区的创建和释放是个耗时的操作,所以为了能尽可能地复用缓存区域,ByteBuf提供了一个discardReadBytes操作,这个操作会重新整理这片缓冲区的内存,将可读区域的内存复制到缓冲区开始的位置,整理完成后readerIndex变为0,可写区域变大,如下是在AbstractByteBuf中关于discardReadBytes的实现代码,不同ByteBuf的实现类中处理机制可能不一样:

public ByteBuf discardReadBytes() {
        this.ensureAccessible();
        if (this.readerIndex == 0) {
            return this;
        } else {
            if (this.readerIndex != this.writerIndex) {
            	//内存复制操作
                this.setBytes(0, this, this.readerIndex, this.writerIndex - this.readerIndex);
                this.writerIndex -= this.readerIndex;
                this.adjustMarkers(this.readerIndex);
                this.readerIndex = 0;
            } else {
                this.adjustMarkers(this.readerIndex);
                this.writerIndex = this.readerIndex = 0;
            }

            return this;
        }
    }

当进行读操作的时候,可能需要对之前的操作进行回滚时。回滚操作实际上就是重新设置读索引的操作。所以在成员变量中设置了mark变量,调用mark变量就是将当前的readerIndex赋值给mark变量,调用reset时就是将当前mark变量的值赋值给readerIndex。
ByteBuf还支持对缓冲区内容的随机读写操作,但是需要注意的就是随机写操作set并不会对ByteBuf缓冲区的长度进行自动扩容。

ByteBuf的分类:

  1. 按照内存分配的方式主要分为两类,一类是HeapByteBuf,主要的特点就是采用这种分配方式的话分配内存和回收的速度会比较快,但是在进行socketChannel通信时需要将数据复制到内核区,影响性能;另一类是DirectByteBuf,相比于上一类,它的分配和回收速度就要慢一些,但是少了一次内存复制操作。
  2. 按照内存回收的角度来看主要也分为两类,即基于对象池的ByteBuf和普通的ByteBuf。因为GC会影响效率,所以基于对象池的ByteBuf采用的方案是循环利用创建好的ByteBuf对象,在高负载的情况下也可以使得GC操作比较平稳

ByteBuf类继承关系

Netty——ByteBuf学习1

AbstractByteBuf源码分析

成员变量:

private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractByteBuf.class);
    private static final String LEGACY_PROP_CHECK_ACCESSIBLE = "io.netty.buffer.bytebuf.checkAccessible";
    private static final String PROP_CHECK_ACCESSIBLE = "io.netty.buffer.checkAccessible";
    static final boolean checkAccessible;//检查是否这段缓冲区可以获得,即有没有被释放
    private static final String PROP_CHECK_BOUNDS = "io.netty.buffer.checkBounds";
    private static final boolean checkBounds;//检查是否越界
    static final ResourceLeakDetector<ByteBuf> leakDetector;//检查对象是否泄漏
    int readerIndex;//读指针
    int writerIndex;//写指针
    private int markedReaderIndex;//保存回滚的读指针
    private int markedWriterIndex;//保存回滚的写指针
    private int maxCapacity;//保存缓冲区最大容量

对可读空间的校验代码,首先判断传入的长度,如果小于0,直接抛出非法参数异常;然后判断当前缓存对象是否已经被释放,抛出非法引用数异常;最后再判断要读取的长度是否超出可读区域,如果超出,抛出越界异常。

protected final void checkReadableBytes(int minimumReadableBytes) {
        if (minimumReadableBytes < 0) {
            throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
        } else {
            this.checkReadableBytes0(minimumReadableBytes);
        }
    }
    private void checkReadableBytes0(int minimumReadableBytes) {
        this.ensureAccessible();
        if (checkBounds && this.readerIndex > this.writerIndex - minimumReadableBytes) {
            throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", this.readerIndex, minimumReadableBytes, this.writerIndex, this));
        }
    }
    protected final void ensureAccessible() {
        if (checkAccessible && this.internalRefCnt() == 0) {
            throw new IllegalReferenceCountException(0);
        }
    }

读取字节数的一个方法,首先检查长度是否合法,调用getBytes方法将长度为length的数据复制到dst字节数组中,这里用到了策略模式,即getBytes方法是子类可以根据自己的情况来实现。读取成功之后需要改变readerIndex的值。

public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
        this.checkReadableBytes(length);
        this.getBytes(this.readerIndex, dst, dstIndex, length);
        this.readerIndex += length;
        return this;
    }

判断是否可以写入的方法。首先检查长度是否小于0,如果小于0,直接抛出非法参数异常;再判断当前对象是否已经被释放,如果被释放,则直接抛出非法引用数异常;如果当前写入的长度大于当前可写的长度,此时需要扩容。

public ByteBuf ensureWritable(int minWritableBytes) {
        if (minWritableBytes < 0) {
            throw new IllegalArgumentException(String.format("minWritableBytes: %d (expected: >= 0)", minWritableBytes));
        } else {
            this.ensureWritable0(minWritableBytes);
            return this;
        }
    }
    final void ensureWritable0(int minWritableBytes) {
        this.ensureAccessible();
        if (minWritableBytes > this.writableBytes()) {
            if (checkBounds && minWritableBytes > this.maxCapacity - this.writerIndex) {
                throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", this.writerIndex, minWritableBytes, this.maxCapacity, this));
            } else {
                int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity);
                this.capacity(newCapacity);
            }
        }
    }
    protected final void ensureAccessible() {
        if (checkAccessible && this.internalRefCnt() == 0) {
            throw new IllegalReferenceCountException(0);
        }
    }

当缓冲区大小不够时的扩容函数代码如下,扩容时设置4M为阈值,在小于4M时,采用倍增法,由于此时内存较小,带来的影响不大,不会造成内存的大量浪费,但是大于4M时,防止内存浪费,采用的是每次增加4M

public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        if (minNewCapacity < 0) {//传入的扩展后的容量小于0,抛出异常
            throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
        } else if (minNewCapacity > maxCapacity) {//扩展后的容量比缓冲区最大容量大,抛出异常
            throw new IllegalArgumentException(String.format("minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity));
        } else {
            int threshold = 4194304;//设置阈值为4M
            if (minNewCapacity == 4194304) {//如果扩展后的容量为4M,直接返回
                return 4194304;
            } else {
                int newCapacity;
                if (minNewCapacity > 4194304) {//如果扩展后的容量比4M大
                    newCapacity = minNewCapacity / 4194304 * 4194304;
                    if (newCapacity > maxCapacity - 4194304) {//如果扩展后的容量比缓冲区最大容量大,直接设置扩展后的容量为缓冲区容量
                        newCapacity = maxCapacity;
                    } else {
                        newCapacity += 4194304;
                    }

                    return newCapacity;
                } else {
                    for(newCapacity = 64; newCapacity < minNewCapacity; newCapacity <<= 1) {
                    }

                    return Math.min(newCapacity, maxCapacity);
                }
            }
        }
    }