nettty3.0之探究FrameDecoder 带你了解解码器的原理

使用过netty的人都知道,在使用netty的时候都或多或少碰到粘包或分包问题,这里就需要使用使用解码器了,那解码器如何工作的呢?现在就带大家探究一下,首次请看下面我画的一张frameDecoder流程图,因为源码就是按照这个思路写的。

nettty3.0之探究FrameDecoder 带你了解解码器的原理

 

带着这个思路,我们就开始我们源码分析之旅吧!!,首先你应该明白,FrameDecoder是一个handler,应为netty中数据交个管道中的handler处理的,接受数据都会触发管道中的messageReceived方法,那么我们就从这里开始分析吧 ,先看它的messageReceived方法

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    Object m = e.getMessage();
    if (!(m instanceof ChannelBuffer)) { //不是channelBuffer型,不处理 
        ctx.sendUpstream(e); //直接发送给下一个handler
    } else {
        ChannelBuffer input = (ChannelBuffer)m;
        if (input.readable()) { //判断是否有数据需要处理
            if (this.cumulation == null) { //上次没有数据剩余
                try { //这里进行解码工作
                    this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); 

                } finally {
                    this.updateCumulation(ctx, input); //将剩余的数据缓存起来
                }
            } else {
                //进入到这里说明有数据剩余,需将读取的数据添加到缓存中
                input = this.appendToCumulation(input); 

                try {
                  //进行解码工作
                    this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
                } finally {
                 //将剩余没处理的数据缓存起来
                    this.updateCumulation(ctx, input);
                }
            }

        }
    }
}

注意没上面代码处理的流程思想跟我所给的流程思路图是一致的,参照我所给的流程图很容易搞懂上面的代码,那么我们就对其中的每一步进行具体的分析,首先让我们来看看callDecode方法也是关键所在

private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer 
cumulation, SocketAddress remoteAddress) throws Exception {
    while(true) { //一个无限循环
        if (cumulation.readable()) { //buffer可读

            int oldReaderIndex = cumulation.readerIndex(); //记录每次读的起始位置

            //这里进行具体的解码工作有用户来具体覆盖

            Object frame = this.decode(context, channel, cumulation);

            if (frame != null) { //不为空说明,解码出了对应数据

               //解码出数据读指针还没变?肯定用户写的方法出问题了 抛异常

                if (oldReaderIndex == cumulation.readerIndex()) { 

                    throw new IllegalStateException("decode() method must read at least 

one byte if it returned a frame (caused by: " + this.getClass() + ')');
                }
                //解码出数据了,向下一个handler传递
                this.unfoldAndFireMessageReceived(context, remoteAddress, frame);

                continue;
            }
             //走到这一步说明用户没有解码到所需数据,但抛弃了channelbuff中的部分数据进入下次循 
              //环后会退出                  
            if (oldReaderIndex != cumulation.readerIndex()) {

                continue;
            }
        }

        return; //啥都没变 说明数据没接受完全,直接返回不解码
    }
}

 对于callDecode 的每一步我都注释的很清楚了,还不明白的请自己解码的流程在纸上画一遍,解码完成后会进入updateCumulation,用来跟新缓存的数据,将未读取的数据保存起来,实现如下:

protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) {
    int readableBytes = input.readableBytes(); //判断是否还有数据可读

    ChannelBuffer newCumulation;

    if (readableBytes > 0) {

        int inputCapacity = input.capacity(); //获取其容量大小
           //剩余字节数,小于其容量,并且其容量超过了copyThreshold,则复制到新的cumlulation
           //这里不需要细究 与其优化有关
        if (readableBytes < inputCapacity && inputCapacity > this.copyThreshold) {

//申请一个新的channelbuffer,并让成员变量cumulation指向新的channelbuffer,
            this.cumulation = newCumulation = this.newCumulationBuffer(ctx, input.readableBytes()); 

            this.cumulation.writeBytes(input); //往其中添加未访问的数据

        } else if (input.readerIndex() != 0) {

              //slice函数是 将input读指针与写指针之间的数据复制到新的channelbuffer中
            this.cumulation = newCumulation = input.slice(); 

        } else { 
            //进入到这里说明input的读指针没变为0,没有读取数据,直接赋值即可
            newCumulation = input;

            this.cumulation = input;
        }
    } else { //进入这里说明没剩余数据
        newCumulation = null;

        this.cumulation = null;
    }

    return newCumulation;
}

很好奇上面为啥会有那么多判断呢,明明只需要缓存剩余数据,其实是为了节约内存,因为读取后读指针前面的可用空间已经读取了不需要缓存起来,上面做的是只缓存读指针与写指针之间的数据,好了接下来我们看最后一个方法appendToCumulation实现如下,超级简单

protected ChannelBuffer appendToCumulation(ChannelBuffer input) {
    ChannelBuffer cumulation = this.cumulation; //缓存的数据

    assert cumulation.readable(); //可读

    if (cumulation instanceof CompositeChannelBuffer) {

        CompositeChannelBuffer composite = (CompositeChannelBuffer)cumulation;
      //与优化有关 暂时不先管
        if (composite.numComponents() >= this.maxCumulationBufferComponents) {
            cumulation = composite.copy();
        }
    }

    this.cumulation = input  = ChannelBuffers.wrappedBuffer(new ChannelBuffer[]{cumulation, input}); //将数据添加到末尾
    return input;
}

自此 基本上每个方法的作用及其原理都给大家讲的差不多了,每个方法基本上都将的很清楚了,相信看下来对于解码器的原理你应该已近很清楚了。还每搞懂的小伙伴们跟着上面的流程在纸上画一遍,多看几次相信你很快也能搞懂的