Netty的心跳检测机制源码阅读

一.原理与源码阅读

Netty4.0提供了一个类,名为IdleStateHandler,这个类可以对三种类型的心跳检测。

....
    public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
        this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
    }
....

前三个的参数解释如下:

1)readerIdleTime:为读超时时间(即测试端一定时间内未接受到被测试端消息)

2)writerIdleTime:为写超时时间(即测试端一定时间内向被测试端发送消息)

3)allIdleTime:所有类型的超时时间

Netty的心跳检测机制源码阅读
idleState示例

我们在pipeLine中加入了IdleSateHandler,第一个参数是readerIdleTimeSeconds,数值为5,那么,在服务器端会每隔5秒来检查一下channelRead方法被调用的情况,如果在5秒内pipeLine中的channelRead方法都没有被触发,就会调用userEventTriggered方法。

下面我们看一下源码是怎么调用的...

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.initialize(ctx);
        super.channelActive(ctx);
    }

首先是channelActive,在channel**时,会触发初始化,下面是初始化的方法

  private void initialize(ChannelHandlerContext ctx) {
        switch(this.state) {
        case 1:
        case 2:
            return;
        default:
            this.state = 1;
            this.initOutputChanged(ctx);
            this.lastReadTime = this.lastWriteTime = this.ticksInNanos();
            if (this.readerIdleTimeNanos > 0L) {
               //注意这里
                this.readerIdleTimeout = this.schedule(ctx, new IdleStateHandler.ReaderIdleTimeoutTask(ctx), this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }

            if (this.writerIdleTimeNanos > 0L) {
                this.writerIdleTimeout = this.schedule(ctx, new IdleStateHandler.WriterIdleTimeoutTask(ctx), this.writerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }

            if (this.allIdleTimeNanos > 0L) {
                this.allIdleTimeout = this.schedule(ctx, new IdleStateHandler.AllIdleTimeoutTask(ctx), this.allIdleTimeNanos, TimeUnit.NANOSECONDS);
            }

        }
    }

这里会根据传入的参数来添加一些定时任务,看一下ReaderIdleTimeoutTask的源码

  private final class ReaderIdleTimeoutTask extends IdleStateHandler.AbstractIdleTask {
        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        protected void run(ChannelHandlerContext ctx) {
            long nextDelay = IdleStateHandler.this.readerIdleTimeNanos;
            if (!IdleStateHandler.this.reading) {
               //注意这里,用当前时间减去最后一次channelRead方法调用的时间,与nextDeay相减
                nextDelay -= IdleStateHandler.this.ticksInNanos() - IdleStateHandler.this.lastReadTime;
            }

            if (nextDelay <= 0L) {
                 //如果超时
                IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, IdleStateHandler.this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                boolean first = IdleStateHandler.this.firstReaderIdleEvent;
                IdleStateHandler.this.firstReaderIdleEvent = false;
                try {
                    //生成一个事件
                    IdleStateEvent event = IdleStateHandler.this.newIdleStateEvent(IdleState.READER_IDLE, first);
                   //调用channelIdle方法
                    IdleStateHandler.this.channelIdle(ctx, event);
                } catch (Throwable var6) {
                    ctx.fireExceptionCaught(var6);
                }
            } else {
                IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }

        }
    }

可以看到,用当前时间减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么会触发channelIdle方法,下面看一下channelIdle的源码

    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

可以看到,会触发fireUserEventTriggered方法的执行,调用pipeLine中的UserEventTriggered方法。

如果没有超时,则会循环定时检测,除非你将IdleStateHandler移除Pipeline。

这就是IdleStateHandler的基本原理了。

二.代码示例

1.服务器端
AcceptorIdleStateTrigger 中对userEventTriggered方法重写,主要作用是,当服务器长时间没有接受到客户端数据,会抛出相应的异常,然后根据异常进行相应的处理,这里我后面的handler会捕获异常,然后将Channel关闭。

@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            IdleState state = stateEvent.state();
            if (state==IdleState.READER_IDLE){
                throw new Exception("客户端空闲时间长,自动断开连接");
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }
}

HeartServerHandler 自定义的处理器

public class HeartServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Server Active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message message= (Message) msg;
        if (message.getType()==(byte)Integer.parseInt("CF", 16)){
            System.out.println(ctx.channel().remoteAddress()+">>>"+message.getType()+":"+message.getRequestId()+":"+message.getBody());
            ctx.writeAndFlush(new Message(message.getType(),message.getRequestId(),"server time->"+new Date()));
        }else if (message.getType()==(byte)Integer.parseInt("AF", 16)){
            System.out.println(ctx.channel().remoteAddress()+">>>>>>>>>>>>>>>>"+message.getBody());
        }
        ctx.fireChannelRead(msg);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("已断开客户端"+ctx.channel().remoteAddress());
        ctx.channel().close();
    }
}

HeartServer 服务端代码

public class HeartServer {

    public static void main(String[] args){
        HeartServer server = new HeartServer();
        server.init(8080);
    }

    public void init (int port){

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);

        NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 128).
                group(bossGroup,workerGroup).
                channel(NioServerSocketChannel.class).
                childOption(ChannelOption.SO_KEEPALIVE, true).
                childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast(new IdleStateHandler(5,0,0));
                pipeline.addLast(new MessageDecoder(Integer.MAX_VALUE,9,4,0,0,false));
                pipeline.addLast(new MessageEncoder());
                pipeline.addLast(idleStateTrigger);
                pipeline.addLast(new HeartServerHandler());
            }
        });

        try {
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("server start ...");
            //服务器同步连接断开时,这句代码才会往下执行
            future.channel().closeFuture().sync();
            System.out.println("server stop ...");
        } catch (InterruptedException e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            e.printStackTrace();
        }

    }
}

2.客户端
客户端长时间没有写数据,会触发下面的userEventTriggered方法,然后根据idle状态发送心跳包,保证客户端和服务器端的连接不会断开。

@ChannelHandler.Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {

    private final Message heartMsg = new Message((byte)Integer.parseInt("AF",16), System.currentTimeMillis(),"Heartbeat");

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            IdleState state = stateEvent.state();
            if (state == IdleState.WRITER_IDLE){
                ctx.writeAndFlush(heartMsg);
            }
        }else{
            super.userEventTriggered(ctx, evt);
        }
    }
}

Netty的心跳检测机制源码阅读
心跳机制实现.png