nettty3.0之探究FrameDecoder 带你了解解码器的原理
使用过netty的人都知道,在使用netty的时候都或多或少碰到粘包或分包问题,这里就需要使用使用解码器了,那解码器如何工作的呢?现在就带大家探究一下,首次请看下面我画的一张frameDecoder流程图,因为源码就是按照这个思路写的。
带着这个思路,我们就开始我们源码分析之旅吧!!,首先你应该明白,FrameDecoder是一个handler,应为netty中数据交个管道中的handler处理的,接受数据都会触发管道中的messageReceived方法,那么我们就从这里开始分析吧 ,先看它的messageReceived方法
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object m = e.getMessage(); if (!(m instanceof ChannelBuffer)) { //不是channelBuffer型,不处理 ctx.sendUpstream(e); //直接发送给下一个handler } else { ChannelBuffer input = (ChannelBuffer)m; if (input.readable()) { //判断是否有数据需要处理 if (this.cumulation == null) { //上次没有数据剩余 try { //这里进行解码工作 this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); } finally { this.updateCumulation(ctx, input); //将剩余的数据缓存起来 } } else { //进入到这里说明有数据剩余,需将读取的数据添加到缓存中 input = this.appendToCumulation(input); try { //进行解码工作 this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); } finally { //将剩余没处理的数据缓存起来 this.updateCumulation(ctx, input); } } } } }
注意没上面代码处理的流程思想跟我所给的流程思路图是一致的,参照我所给的流程图很容易搞懂上面的代码,那么我们就对其中的每一步进行具体的分析,首先让我们来看看callDecode方法也是关键所在
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { while(true) { //一个无限循环 if (cumulation.readable()) { //buffer可读 int oldReaderIndex = cumulation.readerIndex(); //记录每次读的起始位置 //这里进行具体的解码工作有用户来具体覆盖 Object frame = this.decode(context, channel, cumulation); if (frame != null) { //不为空说明,解码出了对应数据 //解码出数据读指针还没变?肯定用户写的方法出问题了 抛异常 if (oldReaderIndex == cumulation.readerIndex()) { throw new IllegalStateException("decode() method must read at least one byte if it returned a frame (caused by: " + this.getClass() + ')'); } //解码出数据了,向下一个handler传递 this.unfoldAndFireMessageReceived(context, remoteAddress, frame); continue; } //走到这一步说明用户没有解码到所需数据,但抛弃了channelbuff中的部分数据进入下次循 //环后会退出 if (oldReaderIndex != cumulation.readerIndex()) { continue; } } return; //啥都没变 说明数据没接受完全,直接返回不解码 } }
对于callDecode 的每一步我都注释的很清楚了,还不明白的请自己解码的流程在纸上画一遍,解码完成后会进入updateCumulation,用来跟新缓存的数据,将未读取的数据保存起来,实现如下:
protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) { int readableBytes = input.readableBytes(); //判断是否还有数据可读 ChannelBuffer newCumulation; if (readableBytes > 0) { int inputCapacity = input.capacity(); //获取其容量大小 //剩余字节数,小于其容量,并且其容量超过了copyThreshold,则复制到新的cumlulation //这里不需要细究 与其优化有关 if (readableBytes < inputCapacity && inputCapacity > this.copyThreshold) { //申请一个新的channelbuffer,并让成员变量cumulation指向新的channelbuffer, this.cumulation = newCumulation = this.newCumulationBuffer(ctx, input.readableBytes()); this.cumulation.writeBytes(input); //往其中添加未访问的数据 } else if (input.readerIndex() != 0) { //slice函数是 将input读指针与写指针之间的数据复制到新的channelbuffer中 this.cumulation = newCumulation = input.slice(); } else { //进入到这里说明input的读指针没变为0,没有读取数据,直接赋值即可 newCumulation = input; this.cumulation = input; } } else { //进入这里说明没剩余数据 newCumulation = null; this.cumulation = null; } return newCumulation; }
很好奇上面为啥会有那么多判断呢,明明只需要缓存剩余数据,其实是为了节约内存,因为读取后读指针前面的可用空间已经读取了不需要缓存起来,上面做的是只缓存读指针与写指针之间的数据,好了接下来我们看最后一个方法appendToCumulation实现如下,超级简单
protected ChannelBuffer appendToCumulation(ChannelBuffer input) { ChannelBuffer cumulation = this.cumulation; //缓存的数据 assert cumulation.readable(); //可读 if (cumulation instanceof CompositeChannelBuffer) { CompositeChannelBuffer composite = (CompositeChannelBuffer)cumulation; //与优化有关 暂时不先管 if (composite.numComponents() >= this.maxCumulationBufferComponents) { cumulation = composite.copy(); } } this.cumulation = input = ChannelBuffers.wrappedBuffer(new ChannelBuffer[]{cumulation, input}); //将数据添加到末尾 return input; } 自此 基本上每个方法的作用及其原理都给大家讲的差不多了,每个方法基本上都将的很清楚了,相信看下来对于解码器的原理你应该已近很清楚了。还每搞懂的小伙伴们跟着上面的流程在纸上画一遍,多看几次相信你很快也能搞懂的