不惑之年的硬件牛人转到软件自学之netty框架(一)netty-异步和事件驱动
由于近期开发一个中型的物联项目,带着十来个兄弟从底层硬件到无线局域通信到通用网关到netty高可用框架到spring cloud的后台开发到移动端APP开发到WEB前端的开发整体框架的搭建,虽然很辛苦,但我一直在给兄弟们说我们要三年内在物联行业占有一席之地,期待项目的成功。就因为这样,我写出了这个netty自学框架,虽然也是开始学习,我想通过项目的历练肯定对大家都有用,加油!
今天是:2018年5月17日 主题:netty-异步和事件驱动
一、什么是netty?
netty 是一款异步的事件驱动的网络应用程序框架,支持快速的开发科维护的高性能的面向协议的服务器和客户端。
二、为什么我这个项目要用netty?
因为我们评估了很多框架,spring cloud虽然有rabbitmq的负载均衡,但对于物联网底层硬件的长连接接入数的高可用及高并发还是有限,不能很好的支持;其他的框架比如:阿里云的MQTT也支持100万级设备,但要钱;emqttd虽然也支持,但没法做持久化,要做持久化就是钱。。。。哎,什么都是钱 ,所以评估了很久,算了,还是自己来做,最终我们选择了netty框架封装mqtt的协议,为什么用mqtt,这个只要知道物联网的地球人都知道,如果只懂互联网的人就不要学了,因为未来是“物联网+”的时代,而不是“互联网+”的时代,互联网时代已经过去,门槛越来越低,只懂互联网必将会被淘汰!哎,不要说废话了!
三、由阻塞I/O处理多个连接转到非阻塞I/O处理多个连接的必须性
以上是阻塞I/O处理多个连接的框图,要处理三个长连接就必须要新开一个线程。问题来了,第一、在任何时候都可能有大量的线程处于休眠状态,只是等待输入或者输出数据就绪。这可能算是一种资源浪费。第二、需要为每个线程的调用都分配内存。第三、即使JAVA虚拟机在物理上可以支持非常大数量的线程,但是远在达到极限之前,上下文切换所带来的开销会带来麻烦,例如,在达到1万个连接的时候;虽然这种并发方案对于支撑中小数量的客户端来说可算可以接受,但是为了支撑10万或100万的并发连接所需要的资源就不理想了。而我的方案框架目的就是需要支持到100万以上的硬件长连接数,做好高并发高可用的支持,因此就用到netty的JAVA NIO(new input/output)非阻塞I/O处理多个连接。
class java.nio.channels.Selector是JAVA的非阻塞I/O实现的关键。因为可以在任何的事件检查任意的读操作或者写操作的完成状态,所以以上图一个单一的线程可以处理多个并发的连接。使用selector的非阻塞I/O可以提供更好的资源管理:
1、使用较少的线程便可以处理许多连接,因此也减少了内存管理和上下文切换所带来开销;
2、当没有I/O操作需要处理的时候,线程也可以被用于其他任务。
四、什么是“异步”,清楚的解释
很容易理解的解释就是:你只有再已经问了一个问题之后才会得到一个和它对应的答案,而在你等待它的同时你也可以做点别的事情。
五、Netty的核心组件:Channel、回调、Future、事件和ChannelHandler。
1、Channel
它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。
目前,可以把Channel看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连 接。
2、回调
一个回调其实就是一个方法,一个指向已经被提供给另一个方法的应用。这使得后者可以在适当的时候调用前者。Netty在内部使用了回调来处理时间;当一个回调被触发时,相关的事件可以被一个interface-ChannelHandler的实现处理。
public class ConnectHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println( "Client " + ctx.channel().remoteAddress() + " connected"); } }
以上代码就是:当一个新的连接被建立时,ChannelHandler的channelActive()回调方法将会被调用,并将打印出一条信息。
3、Future
提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。每个Netty的出站I/O操作都将返回一个ChannelFuture。
public class ConnectExample { private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel(); /** * Listing 1.3 Asynchronous connect * * Listing 1.4 Callback in action * */ public static void connect() { Channel channel = CHANNEL_FROM_SOMEWHERE; //reference form somewhere // Does not block ChannelFuture future = channel.connect( new InetSocketAddress("192.168.0.1", 25)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { ByteBuf buffer = Unpooled.copiedBuffer( "Hello", Charset.defaultCharset()); ChannelFuture wf = future.channel() .writeAndFlush(buffer); // ... } else { Throwable cause = future.cause(); cause.printStackTrace(); } } }); } }
以上代码展示了一个ChannelFuture作为一个I/O操作的一部分返回的例子。这里,connect()方法将会直接返回,而不会阻塞,该调用将会在后台完成。因为线程不会阻塞以等待对应的操作完成,所以它可以同时做其他的工作,从而更加有效的利用资源。
异步地连接到远程节点:
ChannelFuture future = channel.connect( new InetSocketAddress("192.168.0.1", 25));
注册一个ChannelFutureListener以便在操作完成时获得通知
future.addListener(new ChannelFutureListener()
如果操作成功,则创建一个ByteBuf以持有数据
public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { ByteBuf buffer = Unpooled.copiedBuffer( "Hello", Charset.defaultCharset());
将数据异步地发送到远程节点。返回一个ChannelFuture
ChannelFuture wf = future.channel() .writeAndFlush(buffer);
如果发生错误,则访问描述原因的Throwable
Throwable cause = future.cause(); cause.printStackTrace();
4、事件和ChannelHandler
Netty使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。这些动作可能是:记录日志、数据转换、流控制和应用程序逻辑。
Netty是一个网络编程框架,所以事件是按照它们与入站或者相关的状态更改而触发的事件包括:连接已被**或者连接失活;数据读取;用户事件;错误事件。
出站事件是未来将会触发的某个动作的操作结果,这些动作包括:打开或者关闭远程节点的连接;将数据写到或者冲刷到套接字。