学习——Netty(实战自定义RPC)
使用Netty自定义 RPC
1.概述
RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络实现的技术。常见的 RPC 框架有: 源自阿里的 Dubbo,Spring 旗下的 Spring Cloud,Google 出品的 grpc 等等。
- 服务消费方(client)以本地调用方式调用服务
- client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
- client stub 将消息进行编码并发送到服务端
- server stub 收到消息后进行解码
- server stub 根据解码结果调用本地的服务
- 本地服务执行并将结果返回给 server stub
- server stub 将返回导入结果进行编码并发送至消费方
- client stub 接收到消息并进行解码
- 服务消费方(client)得到结果
RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。接下来我们基于 Netty 自己动手搞定一个 RPC。
2.设计和实现
2.1.结构设计
- Client(服务的调用方): 两个接口 + 一个包含 main 方法的测试类
- Client Stub: 一个客户端代理类 + 一个客户端业务处理类
- Server(服务的提供方): 两个接口 + 两个实现类
- Server Stub: 一个网络处理服务器 + 一个服务器业务处理类
注意:服务调用方的接口必须跟服务提供方的接口保持一致(包路径可以不一致)
最终要实现的目标是:在 TestNettyRPC 中远程调用 HelloRPCImpl 或 HelloNettyImpl 中的方法
2.2.代码实现
Server( 服务的提供方)
两个接口和两个实现类,供消费方远程调用。
public interface HelloNetty {
String hello();
}
public class HelloNettyImpl implements HelloNetty {
@Override
public String hello() {
return "hello,netty";
}
}
public interface HelloRPC {
String hello(String name);
}
public class HelloRPCImpl implements HelloRPC {
@Override
public String hello(String name) {
return "hello," + name;
}
}
Server Stub
//封装类信息
public class ClassInfo implements Serializable {
private static final long serialVersionUID = 1L;
private String className; //类名
private String methodName; //方法名
private Class<?>[] types; //参数类型
private Object[] objects; //参数列表
//此处省略 getter 和 setter 方法
}
//服务器端业务处理类
public class InvokeHandler extends ChannelInboundHandlerAdapter {
//得到某接口下某个实现类的名字
private String getImplClassName(ClassInfo classInfo) throws Exception{
/服务方接口和实现类所在的包路径
String interfacePath="cn.itcast.rpc.server";
int lastDot = classInfo.getClassName().lastIndexOf(".");
String interfaceName=classInfo.getClassName().substring(lastDot);
Class superClass=Class.forName(interfacePath+interfaceName);
Reflections reflections = new Reflections(interfacePath);
//得到某接口下的所有实现类
Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);
if(ImplClassSet.size()==0){
System.out.println("未找到实现类");
return null;
}else if(ImplClassSet.size()>1){
System.out.println("找到多个实现类,未明确使用哪一个");
return null;
}else {
//把集合转换为数组
Class[] classes=ImplClassSet.toArray(new Class[0]);
return classes[0].getName(); //得到实现类的名字
}
}
@Override //读取客户端发来的数据并通过反射调用实现类的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo classInfo = (ClassInfo) msg;
Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
//通过反射调用实现类的方法
Object result = method.invoke(clazz, classInfo.getObjects());
ctx.writeAndFlush(result);
}
}
//采用 Netty 自带的 ObjectEncoder 和 ObjectDecoder作为编解码器(为了降低复杂度,这里并没有使用第三方的编解码器),当然实际开发时也可以采用 JSON 或 XML。
public class NettyRPCServer {
private int port;
public NettyRPCServer(int port) {
this.port = port;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.localAddress(port).childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//编码器
pipeline.addLast("encoder", new ObjectEncoder());
//解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
//服务器端业务处理类
pipeline.addLast(new InvokeHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("......Server is ready......");
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new NettyRPCServer(9999).start();
}
}
Client Stub
//客户端业务处理类
public class ResultHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override //读取服务器端返回的数据(远程调用的结果)
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
ctx.close();
}
}
//客户端代理类
public class NettyRPCProxy {
//根据接口创建代理对象
public static Object create(Class target) {
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
//封装 ClassInfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());
//开始用 Netty 发送数据
EventLoopGroup group = new NioEventLoopGroup();
ResultHandler resultHandler = new ResultHandler();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//编码器
pipeline.addLast("encoder", new ObjectEncoder());
//解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
//客户端业务处理类
pipeline.addLast("handler", resultHandler);
}
});
ChannelFuture future = b.connect("127.0.0.1", 9999).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
return resultHandler.getResponse();
}
});
}
}
Client( 服务的调用方- 消费方)
//服务调用方
public class TestNettyRPC {
public static void main(String [] args){
//第 1 次远程调用
HelloNetty helloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class);
System.out.println(helloNetty.hello());
//第 2 次远程调用
HelloRPC helloRPC = (HelloRPC) NettyRPCProxy.create(HelloRPC.class);
System.out.println(helloRPC.hello("RPC"));
}
}