基于Netty 重构RPC 框架
1、RPC 概述
随着用户量增加,当访问量逐渐增大,单一应用增加机器,带来的加速度越来越小,我们需要将应用拆分成互不
干扰的几个应用,以提升效率,于是就出现了垂直应用架构。MVC 架构就是一种非常经典的用于加速前端页面开发的
架构。
当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服逐渐形成稳定的服务中心,
使前端应用能更快速的响应,多变的市场需求,就出现了分布式服务架构。分布式架构下服务数量逐渐增加,为了提
高管理效率,RPC 框架应运而生。RPC 用于提高业务复用及整合的,分布式服务框架下RPC 是关键。
下一代框架,将会是流动计算架构占据主流。当服务越来越多,容量的评估,小服务的资源浪费等问题,逐渐明
显。此时,需要增加一个调度中心,基于访问压力实时管理集群容量,提高集群利用率。SOA 架构就是用于提高及其
利用率的,资源调度和治理中心SOA 是关键。
Netty 基本上是作为架构的技术底层而存在的,主要完成高性能的网络通信。
1.2
在没有RPC 框架以前,接口的调用完全没有规律可循,想怎么调,就怎么调。这导致业务发展到一定阶段之后,对接口
的维护变得非常困难。于是有人提出了服务治理的概念。所有服务间不允许直接调用,而是先到注册中心进行登记,
再由注册中心统一协调和管理所有服务的状态并对外发布,调用者只需要记住服务名称,去找注册中心获取服务即可。比如:我们打电话,所有电话号码有运营商分配。我们需要和某一个人通话时,只需要拨通对方的号码,运营商(注册中心,如中国移动、中国联通、中国电信)就会帮我们将信号转接过去。
目前流行的RPC 服务治理框架主要有Dubbo 和Spring Cloud,下面我们以比较经典的Dubbo 为例。Dubbo 核
心模块主要有四个:Registry 注册中心、Provider 服务端、Consumer 消费端、Monitor 监控中心,如下图所示:
1.3 相关概念
netty解决了技术的和体系结构的领域的问题。首先,它的基于Java NIO的异步的和事件驱动的实现,保证了高负载下应用程序性能的最大化和可伸缩性。其次,netty也包含了一组设计模式,将应用程序逻辑从网络层解耦,简化了开发过程,同时也最大限度地提高了可测试性、模块化及代码的可重用性。
Channel – Socket
EventLoop – 控制流、多线程处理、并发
ChannelFuture – 异步通知
Channel接口
基本的I/O操作(bind()、connect()、read()和writer())。
EventLoop定义了Netty的核心抽象,用于处理连接的生命周期中所发生的事件。
这些关系是:
一个EventLoopGroup包含一个或者多个EventLoop
一个EventLoop在它的生命周期中只和一个Thread绑定
所有的EventLoop处理的I/O事件都将在它专有的Thread上被处理
一个Channel在它的生命周期中只注册于一个EventLoop
一个EventLoop可能会分配给一个或者多个Channel
ChannelFuture接口
netty中所有的I/O操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此, Netty 提供了
ChannelFuture 接口,其 addListener()方法注册了一个 ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
ChannelHandler接口
对于应用程序开发人员来说,netty主要组件就是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。
ChannelPipeline接口
ChannelPipeline提供了ChannelHandler链的容器,并定义了在该站上传播入站和出战事件流的API。当Channel被创建时,它会被自动分配到它专属的ChannelPipeline。
ChannelHandler 安装到 ChannelPipeline 中的过程如下所示:
一个ChannelInitializer的实现被注册到了ServerBootstrap中
当ChannelInitializer.initChannel方法被调用时ChannelInitializer会在ChannelPipeline中安装一组自定义的ChannelHandler
ChannelInitializer将自己从ChannelPipeline中移除
2关键代码
2.1 RpcRegistry类
Registry 注册中心主要功能就是负责将所有Provider 的服务名称和服务引用地址注册到一个容器中,并对外发布。
Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。
public class RpcRegistry {
private int port;
public RpcRegistry(int port){
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast(new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder",new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new RegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = b.bind(port).sync();
System.out.println("GP RPC Registry start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}public class RpcRegistry {
private int port;
public RpcRegistry(int port){
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast(new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder",new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new RegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = b.bind(port).sync();
System.out.println("GP RPC Registry start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
2.2RegistryHandler,类中实现注册的具体逻辑,上面的代码,主要实现服务注册和服务调用的功能。因为所有模块创
建在同一个项目中,为了简化,服务端没有采用远程调用,而是直接扫描本地Class,然后利用反射调用。
public class RegistryHandler extends ChannelInboundHandlerAdapter {
//用保存所有可用的服务
public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();
//保存所有相关的服务类
private List<String> classNames = new ArrayList<String>();
public RegistryHandler(){
//完成递归扫描
scannerClass("com.gupaoedu.vip.netty.rpc.provider");
doRegister();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
InvokerProtocol request = (InvokerProtocol)msg;
//当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参
//使用反射调用
if(registryMap.containsKey(request.getClassName())){
Object clazz = registryMap.get(request.getClassName());
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());
result = method.invoke(clazz, request.getValues());
}
ctx.write(result);
ctx.flush();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/*
* 递归扫描
*/
private void scannerClass(String packageName){
URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
File dir = new File(url.getFile());
for (File file : dir.listFiles()) {
//如果是一个文件夹,继续递归
if(file.isDirectory()){
scannerClass(packageName + "." + file.getName());
}else{
classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
}
}
}
/**
* 完成注册
*/
private void doRegister(){
if(classNames.size() == 0){ return; }
for (String className : classNames) {
try {
Class<?> clazz = Class.forName(className);
Class<?> i = clazz.getInterfaces()[0];
registryMap.put(i.getName(), clazz.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.3RpcProxy类
public class RpcProxy {
public static <T> T create(Class<?> clazz){
//clazz传进来本身就是interface
MethodProxy proxy = new MethodProxy(clazz);
Class<?> [] interfaces = clazz.isInterface() ?
new Class[]{clazz} :
clazz.getInterfaces();
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
return result;
}
private static class MethodProxy implements InvocationHandler {
private Class<?> clazz;
public MethodProxy(Class<?> clazz){
this.clazz = clazz;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//如果传进来是一个已实现的具体类(本次演示略过此逻辑)
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, args);
} catch (Throwable t) {
t.printStackTrace();
}
//如果传进来的是一个接口(核心)
} else {
return rpcInvoke(proxy,method, args);
}
return null;
}
/**
* 实现接口的核心方法
* @param method
* @param args
* @return
*/
public Object rpcInvoke(Object proxy,Method method,Object[] args){
//传输协议封装
InvokerProtocol msg = new InvokerProtocol();
msg.setClassName(this.clazz.getName());
msg.setMethodName(method.getName());
msg.setValues(args);
msg.setParames(method.getParameterTypes());
final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",consumerHandler);
}
});
ChannelFuture future = b.connect("localhost", 8080).sync();
future.channel().writeAndFlush(msg).sync();
future.channel().closeFuture().sync();
} catch(Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
return consumerHandler.getResponse();
}
}
2.4RpcProxyHandler类
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}