基于Netty 重构RPC 框架

1、RPC 概述

基于Netty 重构RPC 框架

    随着用户量增加,当访问量逐渐增大,单一应用增加机器,带来的加速度越来越小,我们需要将应用拆分成互不
干扰的几个应用,以提升效率,于是就出现了垂直应用架构。MVC 架构就是一种非常经典的用于加速前端页面开发的
架构。
当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服逐渐形成稳定的服务中心,
使前端应用能更快速的响应,多变的市场需求,就出现了分布式服务架构。分布式架构下服务数量逐渐增加,为了提
高管理效率,RPC 框架应运而生。RPC 用于提高业务复用及整合的,分布式服务框架下RPC 是关键。
下一代框架,将会是流动计算架构占据主流。当服务越来越多,容量的评估,小服务的资源浪费等问题,逐渐明
显。此时,需要增加一个调度中心,基于访问压力实时管理集群容量,提高集群利用率。SOA 架构就是用于提高及其
利用率的,资源调度和治理中心SOA 是关键。
Netty 基本上是作为架构的技术底层而存在的,主要完成高性能的网络通信。
 

1.2

在没有RPC 框架以前,接口的调用完全没有规律可循,想怎么调,就怎么调。这导致业务发展到一定阶段之后,对接口
的维护变得非常困难。于是有人提出了服务治理的概念。所有服务间不允许直接调用,而是先到注册中心进行登记,
再由注册中心统一协调和管理所有服务的状态并对外发布,调用者只需要记住服务名称,去找注册中心获取服务即可。比如:我们打电话,所有电话号码有运营商分配。我们需要和某一个人通话时,只需要拨通对方的号码,运营商(注册中心,如中国移动、中国联通、中国电信)就会帮我们将信号转接过去。

目前流行的RPC 服务治理框架主要有Dubbo 和Spring Cloud,下面我们以比较经典的Dubbo 为例。Dubbo 核
心模块主要有四个:Registry 注册中心、Provider 服务端、Consumer 消费端、Monitor 监控中心,如下图所示: 

基于Netty 重构RPC 框架

 

1.3 相关概念

 netty解决了技术的和体系结构的领域的问题。首先,它的基于Java NIO的异步的和事件驱动的实现,保证了高负载下应用程序性能的最大化和可伸缩性。其次,netty也包含了一组设计模式,将应用程序逻辑从网络层解耦,简化了开发过程,同时也最大限度地提高了可测试性、模块化及代码的可重用性。

Channel – Socket
EventLoop – 控制流、多线程处理、并发
ChannelFuture – 异步通知
Channel接口
  基本的I/O操作(bind()、connect()、read()和writer())。
  EventLoop定义了Netty的核心抽象,用于处理连接的生命周期中所发生的事件。
这些关系是:

一个EventLoopGroup包含一个或者多个EventLoop
一个EventLoop在它的生命周期中只和一个Thread绑定
所有的EventLoop处理的I/O事件都将在它专有的Thread上被处理
一个Channel在它的生命周期中只注册于一个EventLoop
一个EventLoop可能会分配给一个或者多个Channel
ChannelFuture接口
  netty中所有的I/O操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此, Netty 提供了
ChannelFuture 接口,其 addListener()方法注册了一个 ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
ChannelHandler接口
  对于应用程序开发人员来说,netty主要组件就是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。

ChannelPipeline接口
  ChannelPipeline提供了ChannelHandler链的容器,并定义了在该站上传播入站和出战事件流的API。当Channel被创建时,它会被自动分配到它专属的ChannelPipeline。
  ChannelHandler 安装到 ChannelPipeline 中的过程如下所示:

一个ChannelInitializer的实现被注册到了ServerBootstrap中
当ChannelInitializer.initChannel方法被调用时ChannelInitializer会在ChannelPipeline中安装一组自定义的ChannelHandler
ChannelInitializer将自己从ChannelPipeline中移除

2关键代码

2.1 RpcRegistry类

Registry 注册中心主要功能就是负责将所有Provider 的服务名称和服务引用地址注册到一个容器中,并对外发布。
Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。

public class RpcRegistry {  
    private int port;  
    public RpcRegistry(int port){  
        this.port = port;  
    }  
    public void start(){  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
          
        try {  
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  
                    .childHandler(new ChannelInitializer<SocketChannel>() {
  
                        @Override  
                        protected void initChannel(SocketChannel ch) throws Exception {  
                            ChannelPipeline pipeline = ch.pipeline();
                            //自定义协议解码器
                            /** 入参有5个,分别解释如下
                             maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                             lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                             lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
                             lengthAdjustment:要添加到长度字段值的补偿值
                             initialBytesToStrip:从解码帧中去除的第一个字节数
                             */
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            //自定义协议编码器
                            pipeline.addLast(new LengthFieldPrepender(4));
                            //对象参数类型编码器
                            pipeline.addLast("encoder",new ObjectEncoder());
                            //对象参数类型解码器
                            pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new RegistryHandler());
                        }  
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)       
                    .childOption(ChannelOption.SO_KEEPALIVE, true);  
            ChannelFuture future = b.bind(port).sync();      
            System.out.println("GP RPC Registry start listen at " + port );
            future.channel().closeFuture().sync();    
        } catch (Exception e) {  
             bossGroup.shutdownGracefully();    
             workerGroup.shutdownGracefully();  
        }  
    }public class RpcRegistry {  
    private int port;  
    public RpcRegistry(int port){  
        this.port = port;  
    }  
    public void start(){  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
          
        try {  
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  
                    .childHandler(new ChannelInitializer<SocketChannel>() {
  
                        @Override  
                        protected void initChannel(SocketChannel ch) throws Exception {  
                            ChannelPipeline pipeline = ch.pipeline();
                            //自定义协议解码器
                            /** 入参有5个,分别解释如下
                             maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                             lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                             lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
                             lengthAdjustment:要添加到长度字段值的补偿值
                             initialBytesToStrip:从解码帧中去除的第一个字节数
                             */
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            //自定义协议编码器
                            pipeline.addLast(new LengthFieldPrepender(4));
                            //对象参数类型编码器
                            pipeline.addLast("encoder",new ObjectEncoder());
                            //对象参数类型解码器
                            pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new RegistryHandler());
                        }  
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)       
                    .childOption(ChannelOption.SO_KEEPALIVE, true);  
            ChannelFuture future = b.bind(port).sync();      
            System.out.println("GP RPC Registry start listen at " + port );
            future.channel().closeFuture().sync();    
        } catch (Exception e) {  
             bossGroup.shutdownGracefully();    
             workerGroup.shutdownGracefully();  
        }  
    }

2.2RegistryHandler,类中实现注册的具体逻辑,上面的代码,主要实现服务注册和服务调用的功能。因为所有模块创
建在同一个项目中,为了简化,服务端没有采用远程调用,而是直接扫描本地Class,然后利用反射调用。

public class RegistryHandler  extends ChannelInboundHandlerAdapter {

    //用保存所有可用的服务
    public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();

    //保存所有相关的服务类
    private List<String> classNames = new ArrayList<String>();
    
    public RegistryHandler(){
        //完成递归扫描
        scannerClass("com.gupaoedu.vip.netty.rpc.provider");
        doRegister();
    }
    
    
    @Override    
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Object result = new Object();
        InvokerProtocol request = (InvokerProtocol)msg;

        //当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参
        //使用反射调用
        if(registryMap.containsKey(request.getClassName())){ 
            Object clazz = registryMap.get(request.getClassName());
            Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());    
            result = method.invoke(clazz, request.getValues());   
        }
        ctx.write(result);  
        ctx.flush();    
        ctx.close();  
    }
    
    @Override    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
         cause.printStackTrace();    
         ctx.close();    
    }
    

    /*
     * 递归扫描
     */
    private void scannerClass(String packageName){
        URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
        File dir = new File(url.getFile());
        for (File file : dir.listFiles()) {
            //如果是一个文件夹,继续递归
            if(file.isDirectory()){
                scannerClass(packageName + "." + file.getName());
            }else{
                classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
            }
        }
    }

    /**
     * 完成注册
     */
    private void doRegister(){
        if(classNames.size() == 0){ return; }
        for (String className : classNames) {
            try {
                Class<?> clazz = Class.forName(className);
                Class<?> i = clazz.getInterfaces()[0];
                registryMap.put(i.getName(), clazz.newInstance());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

2.3RpcProxy类基于Netty 重构RPC 框架

public class RpcProxy {  
    
    public static <T> T create(Class<?> clazz){
        //clazz传进来本身就是interface
        MethodProxy proxy = new MethodProxy(clazz);
        Class<?> [] interfaces = clazz.isInterface() ?
                                new Class[]{clazz} :
                                clazz.getInterfaces();
        T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
        return result;
    }

    private static class MethodProxy implements InvocationHandler {
        private Class<?> clazz;
        public MethodProxy(Class<?> clazz){
            this.clazz = clazz;
        }

        public Object invoke(Object proxy, Method method, Object[] args)  throws Throwable {
            //如果传进来是一个已实现的具体类(本次演示略过此逻辑)
            if (Object.class.equals(method.getDeclaringClass())) {
                try {
                    return method.invoke(this, args);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
                //如果传进来的是一个接口(核心)
            } else {
                return rpcInvoke(proxy,method, args);
            }
            return null;
        }


        /**
         * 实现接口的核心方法
         * @param method
         * @param args
         * @return
         */
        public Object rpcInvoke(Object proxy,Method method,Object[] args){

            //传输协议封装
            InvokerProtocol msg = new InvokerProtocol();
            msg.setClassName(this.clazz.getName());
            msg.setMethodName(method.getName());
            msg.setValues(args);
            msg.setParames(method.getParameterTypes());

            final RpcProxyHandler consumerHandler = new RpcProxyHandler();
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //自定义协议解码器
                                /** 入参有5个,分别解释如下
                                 maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                                 lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                                 lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)
                                 lengthAdjustment:要添加到长度字段值的补偿值
                                 initialBytesToStrip:从解码帧中去除的第一个字节数
                                 */
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                //自定义协议编码器
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                //对象参数类型编码器
                                pipeline.addLast("encoder", new ObjectEncoder());
                                //对象参数类型解码器
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                pipeline.addLast("handler",consumerHandler);
                            }
                        });

                ChannelFuture future = b.connect("localhost", 8080).sync();
                future.channel().writeAndFlush(msg).sync();
                future.channel().closeFuture().sync();
            } catch(Exception e){
                e.printStackTrace();
            }finally {
                group.shutdownGracefully();
            }
            return consumerHandler.getResponse();
        }

    }

2.4RpcProxyHandler类

public class RpcProxyHandler extends ChannelInboundHandlerAdapter {  
      
    private Object response;    
      
    public Object getResponse() {    
        return response;    
    }    
  
    @Override    
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    
        response = msg;
    }    
        
    @Override    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
        System.out.println("client exception is general");    
    }