不惑之年的硬件牛人转到软件自学之netty框架(一)netty-异步和事件驱动

    由于近期开发一个中型的物联项目,带着十来个兄弟从底层硬件到无线局域通信到通用网关到netty高可用框架到spring cloud的后台开发到移动端APP开发到WEB前端的开发整体框架的搭建,虽然很辛苦,但我一直在给兄弟们说我们要三年内在物联行业占有一席之地,期待项目的成功。就因为这样,我写出了这个netty自学框架,虽然也是开始学习,我想通过项目的历练肯定对大家都有用,加油!

      今天是:2018年5月17日          主题:netty-异步和事件驱动

     一、什么是netty?

     netty 是一款异步的事件驱动的网络应用程序框架,支持快速的开发科维护的高性能的面向协议的服务器和客户端。

     二、为什么我这个项目要用netty?

     因为我们评估了很多框架,spring cloud虽然有rabbitmq的负载均衡,但对于物联网底层硬件的长连接接入数的高可用及高并发还是有限,不能很好的支持;其他的框架比如:阿里云的MQTT也支持100万级设备,但要钱;emqttd虽然也支持,但没法做持久化,要做持久化就是钱。。。。哎,什么都是钱 ,所以评估了很久,算了,还是自己来做,最终我们选择了netty框架封装mqtt的协议,为什么用mqtt,这个只要知道物联网的地球人都知道,如果只懂互联网的人就不要学了,因为未来是“物联网+”的时代,而不是“互联网+”的时代,互联网时代已经过去,门槛越来越低,只懂互联网必将会被淘汰!哎,不要说废话了!

     三、由阻塞I/O处理多个连接转到非阻塞I/O处理多个连接的必须性

               不惑之年的硬件牛人转到软件自学之netty框架(一)netty-异步和事件驱动

       以上是阻塞I/O处理多个连接的框图,要处理三个长连接就必须要新开一个线程。问题来了,第一、在任何时候都可能有大量的线程处于休眠状态,只是等待输入或者输出数据就绪。这可能算是一种资源浪费。第二、需要为每个线程的调用都分配内存。第三、即使JAVA虚拟机在物理上可以支持非常大数量的线程,但是远在达到极限之前,上下文切换所带来的开销会带来麻烦,例如,在达到1万个连接的时候;虽然这种并发方案对于支撑中小数量的客户端来说可算可以接受,但是为了支撑10万或100万的并发连接所需要的资源就不理想了。而我的方案框架目的就是需要支持到100万以上的硬件长连接数,做好高并发高可用的支持,因此就用到netty的JAVA NIO(new input/output)非阻塞I/O处理多个连接。

         不惑之年的硬件牛人转到软件自学之netty框架(一)netty-异步和事件驱动

       class java.nio.channels.Selector是JAVA的非阻塞I/O实现的关键。因为可以在任何的事件检查任意的读操作或者写操作的完成状态,所以以上图一个单一的线程可以处理多个并发的连接。使用selector的非阻塞I/O可以提供更好的资源管理:

        1、使用较少的线程便可以处理许多连接,因此也减少了内存管理和上下文切换所带来开销;

        2、当没有I/O操作需要处理的时候,线程也可以被用于其他任务。

       四、什么是“异步”,清楚的解释

       很容易理解的解释就是:你只有再已经问了一个问题之后才会得到一个和它对应的答案,而在你等待它的同时你也可以做点别的事情。

      五、Netty的核心组件:Channel、回调、Future、事件和ChannelHandler。

     1、Channel

     它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。

     目前,可以把Channel看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连 接。

     2、回调

     一个回调其实就是一个方法,一个指向已经被提供给另一个方法的应用。这使得后者可以在适当的时候调用前者。Netty在内部使用了回调来处理时间;当一个回调被触发时,相关的事件可以被一个interface-ChannelHandler的实现处理。 

public class ConnectHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx)
            throws Exception {
        System.out.println(
                "Client " + ctx.channel().remoteAddress() + " connected");
    }
}

    以上代码就是:当一个新的连接被建立时,ChannelHandler的channelActive()回调方法将会被调用,并将打印出一条信息。

    3、Future

     提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。每个Netty的出站I/O操作都将返回一个ChannelFuture。

public class ConnectExample {
    private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel();

    /**
     * Listing 1.3 Asynchronous connect
     *
     * Listing 1.4 Callback in action
     * */
    public static void connect() {
        Channel channel = CHANNEL_FROM_SOMEWHERE; //reference form somewhere
        // Does not block
        ChannelFuture future = channel.connect(
                new InetSocketAddress("192.168.0.1", 25));
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    ByteBuf buffer = Unpooled.copiedBuffer(
                            "Hello", Charset.defaultCharset());
                    ChannelFuture wf = future.channel()
                            .writeAndFlush(buffer);
                    // ...
                } else {
                    Throwable cause = future.cause();
                    cause.printStackTrace();
                }
            }
        });

    }
}

    以上代码展示了一个ChannelFuture作为一个I/O操作的一部分返回的例子。这里,connect()方法将会直接返回,而不会阻塞,该调用将会在后台完成。因为线程不会阻塞以等待对应的操作完成,所以它可以同时做其他的工作,从而更加有效的利用资源。

异步地连接到远程节点:

ChannelFuture future = channel.connect(
        new InetSocketAddress("192.168.0.1", 25));

注册一个ChannelFutureListener以便在操作完成时获得通知

future.addListener(new ChannelFutureListener()

 如果操作成功,则创建一个ByteBuf以持有数据

public void operationComplete(ChannelFuture future) {
    if (future.isSuccess()) {
        ByteBuf buffer = Unpooled.copiedBuffer(
                "Hello", Charset.defaultCharset());

  将数据异步地发送到远程节点。返回一个ChannelFuture

  ChannelFuture wf = future.channel()
                .writeAndFlush(buffer);

如果发生错误,则访问描述原因的Throwable

Throwable cause = future.cause();
cause.printStackTrace();

4、事件和ChannelHandler

     Netty使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。这些动作可能是:记录日志、数据转换、流控制和应用程序逻辑。

     Netty是一个网络编程框架,所以事件是按照它们与入站或者相关的状态更改而触发的事件包括:连接已被**或者连接失活;数据读取;用户事件;错误事件。

     出站事件是未来将会触发的某个动作的操作结果,这些动作包括:打开或者关闭远程节点的连接;将数据写到或者冲刷到套接字。