【RPC高性能框架总结】10.手写rpc框架-代码实现(三)

接上一篇《9.手写rpc框架-代码实现(二)
上一篇我们编写了rpc-client客户端工程,其作用是使用Netty封装一个客户端网络层(RpcClient),以及rpc的服务动态代理(RpcProxy),即用来进行远程服务类加载和方法调用的代理类。
编写完客户端之后,我们就来编写服务端,来根据客户端发送的方法调用请求,调用相关方法并反馈方法返回的对象。

注:代码参考http://git.oschina.net/huangyong/rpc(作者:黄勇)

在MyEclipse新建名为rpc-server的maven工程:
【RPC高性能框架总结】10.手写rpc框架-代码实现(三)
【RPC高性能框架总结】10.手写rpc框架-代码实现(三)
【RPC高性能框架总结】10.手写rpc框架-代码实现(三)
新建成功之后,在POM中引入依赖,除了需要rpc-common工程进行编码解码,注册中心模块(这里是zookeeper)。还需要Spring加载类,需要cglib进行反射操作,调用相关类和方法,所以我们引入以下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
      <groupId>com.xxx.rpc</groupId>
      <artifactId>rpc-framework</artifactId>
      <version>0.0.1-SNAPSHOT</version>
    </parent>

    <artifactId>rpc-server</artifactId>

    <dependencies>
        <!-- Spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <!-- CGLib -->
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
        </dependency>
        <!-- RPC Common -->
        <dependency>
            <groupId>com.xxx.rpc</groupId>
            <artifactId>rpc-common</artifactId>
            <version>${project.version}</version>
        </dependency>
        <!-- RPC Registry with ZooKeeper -->
        <!-- <dependency>
            <groupId>com.xxx.rpc</groupId>
            <artifactId>rpc-registry-zookeeper</artifactId>
            <version>${project.version}</version>
        </dependency> -->
    </dependencies>

</project>

因为我们这里还没有编写rpc-registry-zookeeper,所以可以暂时将其注释。
然后我们开始编写代码,首先在src/main/java中创建com.xxx.rpc.client包下的RpcServer、RpcServerHandler和RpcService类,其中RpcServer用来发送处理好的RPC请求,RpcServerHandler用来处理客户端的RPC请求,调用相关类的相关参数,并返回给客户端,RpcService是一个注解,用来标注在相关暴露给其它服务调用的RPC类上。首先是RpcService类,给那些需要暴露服务的类服务的注解类:

package com.xxx.rpc.server;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.stereotype.Component;

//RPC 服务注解(标注在服务实现类上)

//@Target用于定义在注解的上边,表明该注解可以使用的范围。(这里为Type,即可用于/接口、类、枚举、注解)
@Target({ElementType.TYPE})
//@Retention可以用来修饰注解,其中RetentionPolicy指定注解的生命周期
//这里RUNTIME指注解不仅被保存到class文件中,jvm加载class文件之后,仍然存在
@Retention(RetentionPolicy.RUNTIME)
@Component//添加该注解,该类可以被Spring扫描到
public @interface RpcService {

    /**
     * 服务接口类
     */
    Class<?> value();

    /**
     * 服务版本号
     */
    String version() default "";
}

这里的RpcService是一个@interface类型,即一个注解类型,我们需要将该类表达的注解标识在需要暴露给外部调用的服务实现类上。该类的@Target注解、@Retention注解以及@Component注解的意义已在上面源码注释中说明,在该类内容体中,定义了两个变量,一个是使用该注解类的实现类的接口的具体类型,一个是服务实现类的服务版本号。接着是RpcServer,我们编写发现服务和发送服务调用结果的逻辑:

package com.xxx.rpc.server;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;

import com.xxx.rpc.common.bean.RpcRequest;
import com.xxx.rpc.common.bean.RpcResponse;
import com.xxx.rpc.common.codec.RpcDecoder;
import com.xxx.rpc.common.codec.RpcEncoder;
import com.xxx.rpc.common.utils.StringUtil;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

//RPC 服务器,用于发布 RPC 服务
//实现ApplicationContextAware是为了获取该类所在的Spring容器,以操作spring容器及其中的Bean实例。
//实现InitializingBean(提供了初始化方法的方式)是为了在初始化Bean之前加一些逻辑(afterPropertiesSet方法中)
public class RpcServer implements ApplicationContextAware, InitializingBean{
    //日志对象
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
    //服务地址
    private String serviceAddress;
    //服务地址注册类,该类由注册中心实现
    private ServiceRegistry serviceRegistry;

    /**
     * 存放 服务名 与 服务对象 之间的映射关系
     */
    private Map<String, Object> handlerMap = new HashMap<>();

    public RpcServer(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    public RpcServer(String serviceAddress, ServiceRegistry serviceRegistry) {
        this.serviceAddress = serviceAddress;
        this.serviceRegistry = serviceRegistry;
    }
    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        //扫描带有RpcService注解的类,并初始化HandlerMap对象
        Map<String,Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
        if(MapUtils.isNotEmpty(serviceBeanMap)){
            //遍历带有RpcService注解的类
            for(Object serviceBean:serviceBeanMap.values()){
                //获取注解对象
                RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
                //通过注解对象,获取其修饰参数(serviceName服务名称,serviceVersion服务版本)
                String serviceName = rpcService.value().getName();
                String serviceVersion = rpcService.version();
                if (StringUtil.isNotEmpty(serviceVersion)) {
                    serviceName += "-" + serviceVersion;//服务名与版本号拼接
                }
                handlerMap.put(serviceName, serviceBean);//存放进映射Map,供Handler处理类操作
            }
        }
    }
    
     @Override
     public void afterPropertiesSet() throws Exception {
         EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                // 创建并初始化 Netty 服务端 Bootstrap 对象
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup);
                bootstrap.channel(NioServerSocketChannel.class);
                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解码 RPC 请求
                        pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 编码 RPC 响应
                        pipeline.addLast(new RpcServerHandler(handlerMap)); // 处理 RPC 请求
                    }
                });
                bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
                bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
                // 获取 RPC 服务器的 IP 地址与端口号
                String[] addressArray = StringUtil.split(serviceAddress, ":");
                String ip = addressArray[0];
                int port = Integer.parseInt(addressArray[1]);
                // 启动 RPC 服务器
                ChannelFuture future = bootstrap.bind(ip, port).sync();
                // 注册 RPC 服务地址
                if (serviceRegistry != null) {
                    for (String interfaceName : handlerMap.keySet()) {
                        serviceRegistry.register(interfaceName, serviceAddress);
                        LOGGER.debug("register service: {} => {}", interfaceName, serviceAddress);
                    }
                }
                LOGGER.debug("server started on port {}", port);
                // 关闭 RPC 服务器
                future.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
     }
    
    //注册中心服务注册对象,还没有实现,这里先放一个空的
    class ServiceRegistry{

        public String register(String interfaceName,String serviceAddress) {
            // TODO Auto-generated method stub
            return null;
        }
        
    }
}

RpcServer类的作用主要是发布RPC服务,该类主要实现了两个接口,分别是ApplicationContextAware和InitializingBean。
其中实现ApplicationContextAware是为了获取该类所在的Spring容器,以操作spring容器及其中的Bean实例,下面的setApplicationContext方法就是用来操作Spring容器中的类的,这里会扫描带有RpcService注解的类,然后将获取的这些类的信息(服务名,服务对象本身)保存进handlerMap,用于后面获取服务直接使用。
然后实现nitializingBean是为了在初始化Bean之前加一些逻辑,下面的afterPropertiesSet方法就是在初始化所有Bean之前调用的方法,因为我们要发布rpc服务,即将所有需要暴露给外部服务的服务实现类通过Netty发布出去,所以在所有Bean初始化之前,组装好的handlerMap中的服务发布出去,这里是注册到注册中心中。上面Netty发布服务的时候,调用了3个Handler,除了编码解码的RpcEncoder、RpcDecoder以外,就是即将要编写的RpcServerHandler,核心的业务处理器,该处理器主要用来处理客户端rpc请求的逻辑,代码如下:

package com.xxx.rpc.server;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.xxx.rpc.common.bean.RpcRequest;
import com.xxx.rpc.common.bean.RpcResponse;
import com.xxx.rpc.common.utils.StringUtil;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;

//RPC服务端处理器,用于处理RPC请求
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest>{
    //日志对象
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServerHandler.class);
    
    //服务名以及相关服务类的Map对象
    private final Map<String,Object> handlerMap; 
    //构造方法获取相关Map对象
    public RpcServerHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }
    
    //处理rpc客户端发来的服务调用 响应
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
        //创建并初始化RPC响应对象
        RpcResponse response = new RpcResponse();
        //反馈发来的独一无二的请求ID
        response.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            response.setResult(result);
        } catch (Exception e) {
            //如果报错,首先记录日志,然后在返回对象中将报错信息放入
            LOGGER.error("handle result failure",e);
            response.setException(e);
        }
        //写入RPC响应对象并自动关闭连接
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
    
    //拿到RPC客户端请求的服务信息,将相关服务(服务名、版本)加载出来,反馈给客户端
    private Object handle(RpcRequest request) throws Exception {
        //获取服务对象
        String serviceName = request.getInterfaceName();//获取请求的服务接口名
        String serviceVersion = request.getServiceVersion();//获取请求的服务版本
        if (StringUtil.isNotEmpty(serviceVersion)) {//如果版本不为空,则拼接上去
            serviceName += "-" + serviceVersion;
        }
        //从加载出来的所有服务类中找到客户端请求的serviceName对应的具体Bean
        Object serviceBean = handlerMap.get(serviceName);
        if (serviceBean == null) {//如果请求的相关类不存在,则进行报错
            throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
        }
        //如果请求的相关类存在,则通过反射获取调用该类相关请求方法所需的参数
        Class<?> serviceClass = serviceBean.getClass();//获取目标类的类型
        String methodName = request.getMethodName();//获取请求调用的方法名
        Class<?>[] paramterTypes = request.getParamterTypes();//获取请求调用的参数类型
        Object[] paramters = request.getParameters();//获取请求调用的具体参数类
        //使用CGLib执行反射调用
        FastClass serviceFastClass = FastClass.create(serviceClass);
        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, paramterTypes);
        return serviceFastMethod.invoke(serviceBean, paramters);//返回调用具体方法的结果
    }
    
    //读取响应时发生异常的处理方法
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        LOGGER.error("server caught exception", cause);
        ctx.close();
    }
}

该类的构造方法RpcServerHandler(Map<String, Object> handlerMap)就是用来接收前面RpcServer传来的已经获取好的暴露给外部的服务实现类Map------handlerMap。该类继承了SimpleChannelInboundHandler,即为一个InboundHandler,重写channelRead0方法来处理rpc客户端发来的服务调用,在channelRead0方法中,封装返回的RpcResponse,其中调用的handle内部方法,就是从handlerMap中获取客户端请求的服务名对应的类,然后使用CGLib执行反射调用客户端请求的方法,最后返回调用的结果。

至此,我们将rpc-server服务端编写完成。下一篇我们来编写服务注册中心,借此我们来理解服务是如何注册到服务中心,以及服务是从注册中心发现的。

转载请注明出处:https://blog.csdn.net/acmman/article/details/88914287