不惑之年的硬件牛人转到软件自学之netty框架(二)我的第一个netty应用程序

由于近期开发一个中型的物联项目,带着十来个兄弟从底层硬件到无线局域通信到通用网关到netty高可用框架到spring cloud的后台开发到移动端APP开发到WEB前端的开发整体框架的搭建,虽然很辛苦,但我一直在给兄弟们说我们要三年内在物联行业占有一席之地,期待项目的成功。就因为这样,我写出了这个netty自学框架,虽然也是开始学习,我想通过项目的历练肯定对大家都有用,加油!

      今天是:2018年5月18日          主题:我的第一个netty应用程序

      这节我们将展示如何构建一个基于Netty的客户端和服务器。应用程序很简单:客户端将消息发送给服务器,而服务器再将消息回送给客户端。通过这个实例,我们可以做到两点:

     第一,它会提供一个测试台,用于设置和验证我的开发工具(IDEA)和环境(JDK1.8),可以通过这个示例代码的练习来为自己将来的开发工作做准备;

     第二,我将获得关于Netty的一个关键方面的实践经验。

     一、编写Echo客户端和服务器应用程序,所能够支持的客户端数量,理论上,仅受限于系统的可用资源(以及所使用的JDK版本可能会施加的限制)

                                 不惑之年的硬件牛人转到软件自学之netty框架(二)我的第一个netty应用程序

           上图看到,Echo客户端和服务器之间的交互是非常简单的;在客户端建立一连接之后,它会想服务器发送一个或多个消息,发过来,服务器又会将每个消息回送给客户端,充分体现了客户端/服务器系统中典型的请求-响应交互模式。

      二、编写Echo服务器

      所有的Netty服务器都需要以下两部分:

      第一、至少一个ChannelHandler-该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑;

      第二、引导-这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口数。

      1、ChannelHandler和业务逻辑:“EchoServerHandler”

      ChannelHandler是一个接口族的父接口,它的实现负责接收并响应事件通知。在Netty应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。因为Echo服务器会响应传入的消息,所以它需要实现ChannelInboundHandler接口,用来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承ChannelInboundHandlerAdapter类就可以了。

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println(
                "Server received: " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

以上方法:channelRead()-------对于每个传入的消息都要调用;channelReadComplete()-------通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息;exceptionCaught()--------在读取操作期间,有异常抛出时会调用。

    @Sharable:标示一个ChannelHandler可以被多个Channel安全地共享

@Sharable

    将消息记录到控制台

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    System.out.println(
            "Server received: " + in.toString(CharsetUtil.UTF_8));

   将接收到的消息写给发送者,而不冲刷出站消息

ctx.write(in);

将未决消息冲刷到远程节点,并且关闭该Channel

public void channelReadComplete(ChannelHandlerContext ctx)
        throws Exception {
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
            .addListener(ChannelFutureListener.CLOSE);
}

打印异常跟踪,并关闭该Channel

public void exceptionCaught(ChannelHandlerContext ctx,
    Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}
2、引导服务器“EchoServer”

上面EchoServerHandler实现的是核心业务逻辑,这里我们将分析引导服务器,主要完成如下内容:

    第一、绑定到服务器将在其上监听并接受传入连接请求的端口;

    第二、配置Channel,以将有关的入站消息通知给EchoServerHandler实例

   完整代码如下:

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args)
        throws Exception {
        if (args.length != 1) {
            System.err.println("Usage: " + EchoServer.class.getSimpleName() +
                " <port>"
            );
            return;
        }
        int port = Integer.parseInt(args[0]);
        new EchoServer(port).start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                .channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(port))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(serverHandler);
                    }
                });

            ChannelFuture f = b.bind().sync();
            System.out.println(EchoServer.class.getName() +
                " started and listening for connections on " + f.channel().localAddress());
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

首先,设置端口值(如果端口参数的格式不正确,则抛出一个异常)

if (args.length != 1) {
    System.err.println("Usage: " + EchoServer.class.getSimpleName() +
        " <port>"
    );
    return;
}

 其次,调用服务器的start()方法

int port = Integer.parseInt(args[0]);
new EchoServer(port).start();

第三、创建EventLoopGroup来接受和处理新的连接

EventLoopGroup group = new NioEventLoopGroup();

第四、创建ServerBootstrap

ServerBootstrap b = new ServerBootstrap();

第五、指定所使用的NIO传输Channel

.channel(NioServerSocketChannel.class)

第六、将本地地址设置为一个具有选定端口的InetSocketAddress

.localAddress(new InetSocketAddress(port))

第七、这点是管家,使用了一个特殊的类-ChannelInitializer,当一个新的连接被接受时,一个新的子Channel将会被创建,而ChannelInitializer将会把一个EchoServerHandler的实例添加到该Channel的ChannelPipeline中。

.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(serverHandler);
    }
});

第八、异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成

ChannelFuture f = b.bind().sync();

第九、获取Channel的CloseFuture,并且阻塞当前线程直到它完成

f.channel().closeFuture().sync();

第十、关闭EventLoopGroup,释放所有的资源

group.shutdownGracefully().sync();

3、以上步骤看似有些复杂,但我们可以总结一下重要步骤:

  服务器的主要代码组件:

       EchoServerHandler实现了业务逻辑;

       main()方法引导了服务器;

  引导过程中所需要的步骤如下:

       创建一个ServerBootstrap的实例以引导和绑定服务器;

       创建并分配一个NioEventLoopGroup实例以进行事件的处理,如接受新连接以及读/写数据;

       指定服务器绑定的本地的InetSocketAddress;

       使用一个EchoServerHandler的实例初始化每一个新的Channel;

       调用ServerBootstrap.bind()方法以绑定服务器。

  三、编写Echo客户端

    1、重要步骤:

    (1)、为初始化客户端,创建了一个Bootstrap实例连接到服务器;

    (2)、为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据;

    (3)、为服务器连接创建了一个InetSocketAddress实例;

    (4)、当连接被建立时,一个EchoClientHandler实例会被安装到ChannelPipeline中;

    (5)、当一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点。

     编写客户端所涉及的两个主要代码部分也是业务逻辑和引导。和服务器端一样。

    2、客户端的ChannelHandler实现客户端逻辑

    (1)、标记该类的实例可以被多个Channel共享

@Sharable

     (2)、channelActive()表示在到服务器的连接已经建立之后将被调用,其确保了数据将会被尽可能快地写入服务器,在这个场景下是一个编码了字符串“Netty rocks!”的字节缓冲区

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",
            CharsetUtil.UTF_8));
}

     (3)、channelRead0()表示从服务器接收到一条消息时被调用,每当接收数据时,都会调用这个方法。需要注意的是,由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了5个字节,那么不能保证这5个字节会被一次性接收。即使是对于这么少量的数据,channelRead0()方法也可能会被调用两次,第一次使用一个持有3字节的ByteBuf(Netty的字节容器),第二次使用一个持有2字节的ByteBuf。作为一个面向流的协议,TCP保证了字节数组将会按照服务器发送它们的顺序被接收。

@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
    System.out.println(
            "Client received: " + in.toString(CharsetUtil.UTF_8));
}

    (4)、exceptionCaught()表示在处理过程中引发异常时被调用,一旦出现异常将终止到服务器的连接。

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
    Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}

   3、引导客户端EchoClient

  (1)、使用Echo服务器的地址和端口连接

public EchoClient(String host, int port) {
    this.host = host;
    this.port = port;
}

   (2)、创建Bootstrap并指定EventLoopGroup以处理客户端事件;需要适用于NIO的实现,并建立Channel的传输类型

Bootstrap b = new Bootstrap();
b.group(group)
    .channel(NioSocketChannel.class)

    (3)、设置服务器的InetSocketAddress地址和端口;向ChannelPipeline中添加一个EchoClientHandler实例

.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch)
        throws Exception {
        ch.pipeline().addLast(
             new EchoClientHandler());
    }
});

    (4)、连接到远程节点,阻塞等待直到连接完成。完成后关闭Channel和线程池,并释放所有的资源

  ChannelFuture f = b.connect().sync();
    f.channel().closeFuture().sync();
} finally {
    group.shutdownGracefully().sync();
}

四、完成后就是关键的运行问题,这个花了我一些时间,主要在于需要终端输入参数args才能找到端口号运行,具体步骤如下:

1、进入服务器端的工程,选择IDEA菜单中的“RUN”->“Edit Configurations...”

                                       不惑之年的硬件牛人转到软件自学之netty框架(二)我的第一个netty应用程序

2、在“Program arguments:”就可以填入args的参数,当然要按照“args[0]、args[1].....”的顺序来做。输入args[0]为端口号:8085

                     不惑之年的硬件牛人转到软件自学之netty框架(二)我的第一个netty应用程序

3、运行主程序“EchoServer”即可跑起来

                     不惑之年的硬件牛人转到软件自学之netty框架(二)我的第一个netty应用程序

4、同理进入客户端的工程,在“RUN”->“Edit Configurations...”,输入本地IP:127.0.0.1 (args[0])和端口号:8085( args[1]),即可【注:下面的Use classpath of module里面要选为echo-client】:

               不惑之年的硬件牛人转到软件自学之netty框架(二)我的第一个netty应用程序


5、运行主程序“EchoClient”会打印出:“client received:Netty rocks!”的结果,表示已经成功

               不惑之年的硬件牛人转到软件自学之netty框架(二)我的第一个netty应用程序