Hadoop学习(五)——轻量级RPC框架
rpc是hadoop在运行过程中,服务器间相互访问的通讯基础,rpc底层是以socket的方式通讯,在hadoop中使用了较新的NIO,以提高网络的传输速度。
1. RPC原理学习
1.1. 什么是RPC
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
有一台机器,写一个order(@autowired),然后再在服务端写一个service(@rpcservice),一旦server加上注解,则对应的实现类就变成socket服务。当项目一启动的时候,程序会扫描注解,将所有的注解取出,spring会构造相应的bean,然后可以通过context获得工程中所有的service,放在对应的一个hashmap中直接调用,左边是注解id,右边是对应的服务类,基于这样的思维,写service的人就不需要太关注底层的关系,只需要写service并加上注解就可以了。
扫描注解的时候会启动某一个server,并带有一定的端口,此时客户端如果请求过来,只需要了解一定的接口信息即可,服务端一旦拿到请求,就会到map中找,找到即可确定service的位置,通过反射的方式去掉用实现类。
当客户端调用具体的方法时是用的动态代理,即需要给客户端注入一个动态代理对象,在对象中加入proxy,然后调用类中方法createorder,此调用会被invoke方法拦截,并进行相应的强化。
由上图的proxy可以看到,我们已知接口、method、参数,只需要将这三个数字封装一下,然后在invoke中封装成一个请求,即request对象。然后将这些值通过socket传到服务端。服务器端会decode解析request。然后使用具体的请求。
返回的结果会存放在result中,然后编码到response中,并通过socket返回给客户端。
当一个人开发完app后,如何知道他的server在哪里呢?
因此需要一个zookeeper,服务端在启动服务器的时候,会将注释向zookeeper注册,则客户端在调用的时候就会将接口名称,服务器的地址等参数向zookeeper中查询服务所在的位置,然后才会启动一个socket。
由此,完成了rpc框架的设计图。
此时发现socketserver是传统的通讯方式,因此引入一个NETTY框架,即使用NIO的原理来进行通讯。
1.2. RPC原理
运行时,一次客户机对服务器的RPC调用,其内部操作大致有如下十步:
1. 调用客户端句柄;执行传送参数
2. 调用本地系统内核发送网络消息
3. 消息传送到远程主机
4. 服务器句柄得到消息并取得参数
5. 执行远程过程
6. 执行的过程将结果返回服务器句柄
7. 服务器句柄返回结果,调用远程系统内核
8. 消息传回本地主机
9. 客户句柄由内核接收消息
10. 客户接收句柄返回的数据
2. nio原理学习(nio的优势不在于数据传送的速度)
2.1. 简介
nio 是New IO 的简称,在jdk1.4 里提供的新api 。Sun 官方标榜的特性如下: 为所有的原始类型提供(Buffer)缓存支持。字符集编码解码解决方案。 Channel :一个新的原始I/O 抽象。 支持锁和内存映射文件的文件访问接口。 提供多路(non-bloking) 非阻塞式的高伸缩性网络I/O 。
2.2. socket nio原理
2.2.1. 传统的I/O
使用传统的I/O程序读取文件内容, 并写入到另一个文件(或Socket), 如下程序:
File.read(fileDesc,buf, len);
Socket.send(socket,buf, len);
会有较大的性能开销, 主要表现在一下两方面:
1. 上下文切换(contextswitch), 此处有4次用户态和内核态的切换
2.Buffer内存开销, 一个是应用程序buffer, 另一个是系统读取buffer以及socket buffer
传统IO的原理:
如果想从磁盘读一些数据,数据首先会被操作系统堵在他们内核的缓存中,然后再交给应用的缓存,然后应用缓存中的数据会放到socket中的缓存中,然后再进入到NIC缓存中。
通过这多层交互,引发传输变慢,因此引发了NIO,来提升效率。
1) 先将文件内容从磁盘中拷贝到操作系统buffer
2) 再从操作系统buffer拷贝到程序应用buffer
3) 从程序buffer拷贝到socketbuffer
4) 从socketbuffer拷贝到协议引擎.
2.2.2. NIO
NIO技术省去了将操作系统的read buffer拷贝到程序的buffer, 以及从程序buffer拷贝到socket buffer的步骤,直接将 read buffer 拷贝到 socket buffer. java 的FileChannel.transferTo() 方法就是这样的实现, 这个实现是依赖于操作系统底层的sendFile()实现的.
publicvoidtransferTo(long position, long count, WritableByteChannel target);
他的底层调用的是系统调用sendFile()方法
sendfile(intout_fd, int in_fd, off_t *offset, size_t count);
如下图:
应用缓存只是传递一个指令过去,并没有实现缓存的交互,因此提升了传输速度。(jdk1.5之后是非阻塞IO,jdk1.7之后有异步非阻塞IO)
在通讯的发展过程中产生过伪异步。
即客户端socket的主线程,不同的产生子线程,用来访问服务端,以此来提高访问速度。
真正的异步是AIO:
在服务器代码中向内核注册一个监听器(select),一旦内核有事件发生就会通知监听器,则监听器立马发出指令,要求客户端与内核产生连接,客户端与内核建立成功后,就会产生三次握手,三次握手成功后内核会通知服务器端,然后select会继续注册一个read监听,如果客户端有数据进来,则read就会行程指令,通知服务端。
如果客户端有数据发送到内核中,内核中的缓存就会有数据,一旦内核发现缓存中有数据,就会将其放到应用系统在内核中建立的缓存中,然后通知应用系统,可以进行read了,此时应用系统才会通过channel去读,因此产生异步操作。
NIO的代码:
1)服务端线程:
服务端:
publicclass MultiplexerTimeServer implements Runnable {
private Selector selector;
privateServerSocketChannel servChannel;
privatevolatilebooleanstop;
/**
* 初始化多路复用器、绑定监听端口
* @param port
*/
publicMultiplexerTimeServer(intport) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(newInetSocketAddress(port), 1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : " + port);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
publicvoid stop() {
this.stop = true;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
publicvoid run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it= selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()){
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
privatevoid handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// Read the data
SocketChannel sc =(SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
intreadBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = newbyte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : "
+ body);
String currentTime = "QUERY TIME ORDER"
.equalsIgnoreCase(body) ? newjava.util.Date(
System.currentTimeMillis()).toString()
doWrite(sc, currentTime);
} elseif (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
privatevoid doWrite(SocketChannel channel, String response)
throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
NIO的客户端:
public class TimeClientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
/*
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
doConnect();
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it= selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()){
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doConnect() throws IOException {
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining())
System.out.println("Send order 2 server succeed.");
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()){
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else
System.exit(1);// 连接失败,进程退出
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
intreadBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = newbyte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
this.stop = true;
} elseif (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
}else
; // 读到0字节,忽略
}
}
}
}
客户端:
3. netty常用API学习
3.1. netty简介
Netty是基于Java NIO的网络应用框架.
Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。Netty的内部实现时很复杂的,但是Netty提供了简单易用的api从网络处理代码中解耦业务逻辑。Netty是完全基于NIO实现的,所以整个Netty都是异步的。
网络应用程序通常需要有较高的可扩展性,无论是Netty还是其他的基于Java NIO的框架,都会提供可扩展性的解决方案。Netty中一个关键组成部分是它的异步特性.
3.2. netty的helloworld
3.2.1. 下载netty包
• 下载netty包,下载地址http://netty.io/
3.2.2. 服务端启动类
package com.netty.demo.server;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;
/** * • 配置服务器功能,如线程、端口 • 实现服务器处理程序,它包含业务逻辑,决定当有一个请求连接或接收数据时该做什么 * @author wilson */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup eventLoopGroup = null; try { //创建ServerBootstrap实例来引导绑定和启动服务器 ServerBootstrap serverBootstrap = new ServerBootstrap(); //创建NioEventLoopGroup对象来处理事件,如接受新连接、接收数据、写数据等等 eventLoopGroup = new NioEventLoopGroup(); //指定通道类型为NioServerSocketChannel,设置InetSocketAddress让服务器监听某个端口已等待客户端连接。 serverBootstrap.group(eventLoopGroup) .channel(NioServerSocketChannel.class) .localAddress("localhost",port) .childHandler(new ChannelInitializer<Channel>() { //设置childHandler执行所有的连接请求 @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new EchoServerHandler()); } }); // 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。 ChannelFuture channelFuture = serverBootstrap.bind().sync(); System.out.println("开始监听,端口为:" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoServer(20000).start(); } } |
3.2.3. 服务端回调方法
package com.netty.demo.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server 读取数据……"); //读取数据 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("接收客户端数据:" + body); //向客户端写数据 System.out.println("server向client发送数据"); String currentTime = new Date(System.currentTimeMillis()).toString(); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("server 读取数据完毕.."); ctx.flush(); //刷新后才将数据发出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } |
3.2.4. 客户端启动类
package com.netty.demo.client;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; /** * • 连接服务器 • 写数据到服务器 • 等待接受服务器返回相同的数据 • 关闭连接 * @author wilson */ public class EchoClient {
private final String host; private final int port;
public EchoClient(String host, int port) { this.host = host; this.port = port; }
public void start() throws Exception { EventLoopGroup nioEventLoopGroup = null; try { //创建Bootstrap对象用来引导启动客户端,客户端引导类。 Bootstrap bootstrap = new Bootstrap(); //创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据 nioEventLoopGroup = new NioEventLoopGroup(); //创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址 bootstrap.group(nioEventLoopGroup) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { //添加一个ChannelHandler,客户端成功连接服务器后就会被执行,用来存放处理业务逻辑的地方。
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new EchoClientHandler());
} }); // • 调用Bootstrap.connect()来连接服务器 ChannelFuture f = bootstrap.connect().sync(); // • 最后关闭EventLoopGroup来释放资源 f.channel().closeFuture().sync(); } finally { nioEventLoopGroup.shutdownGracefully().sync(); } }
public static void main(String[] args) throws Exception { new EchoClient("localhost", 20000).start(); } } |
3.2.5. 客户端回调方法
package com.netty.demo.client;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { //客户端连接服务器后被调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端连接服务器,开始发送数据……"); byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuf firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); ctx.writeAndFlush(firstMessage); } //• 从服务器接收到数据后调用 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client 读取server数据.."); //服务端返回消息后 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("服务端数据为 :" + body); } //• 发生异常时被调用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught.."); // 释放资源 ctx.close(); } } |
3.3. netty中handler的执行顺序
3.3.1. 简介
Handler在netty中,无疑占据着非常重要的地位。Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。一句话,没有它做不到的只有你想不到的。
Netty中的所有handler都实现自ChannelHandler接口。按照输出输出来分,分为ChannelInboundHandler、ChannelOutboundHandler两大类。ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。
Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:
3.3.2. 代码
服务端:
如上面的四个handler——EchInHandler1;EchInHandler2;EchOutHandler1;EchOutHandler2;
In的handler要顺序执行先执行1,然后执行2,而对于out,则反向执行,即先执行2,后执行1,如果代码的顺序进行修改,如下图:
顺序依然先2后1,但是如果将out放在in后面,则这两个out不会执行,就像上图中的顺序是不执行的。
Handler1:
fireChannelRead()是将数据流读入到另一个inHandler中,而write()是为了将数据流读入到OutHandler中。
Handler2:
outHandler1:
outHandler2:
ClientHandler:
Client:
3.3.3. 总结
在使用Handler的过程中,需要注意:
1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler。
2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。
3、流水线pipeline中outhandler不能放在最后,否则不生效。
4、Handler的消费处理放在最后一个处理。
3.4. netty发送对象
3.4.1. 简介
Netty中,通讯的双方建立连接后,会把数据按照ByteBuf的方式进行传输,例如http协议中,就是通过HttpRequestDecoder对ByteBuf数据流进行处理,转换成http的对象。基于这个思路,我自定义一种通讯协议:Server和客户端直接传输java对象。
实现的原理是通过Encoder把java对象转换成ByteBuf流进行传输,通过Decoder把ByteBuf转换成java对象进行处理,处理逻辑如下图所示:
3.4.2. 代码
Netty一直发送的是字符串,如何进行对象发送:
只需要加一个序列化与反序列化。
那如何发送person对象。
Netty给的是bytebuff,需要转化成byte[],然后反序列化成String。
这是JAVA的IO流:
对象的序列化与反序列化用的工具为:
需要传递的对象为:
客户端为:
服务端:
工具类:
、
NIO的优势不在于速度,传统的IO会在多个地方行程阻塞;对服务器的性能影响比较大,
而NIO则会取消这些阻塞,减小服务器压力。
4. Spring(IOC/AOP)注解学习
4.1. spring的初始化顺序
在spring的配置文件中配置bean,如下
通过bean中的字符串构造对象,用的是反射的内容,xml文件中的注解方式有两种:其一、直接用component-scan,然后再在各个class的上面写上@component,程序加载时会自动扫描各个类,然后将其加载到内存中。
其二、用一个个的bean注解,添加到spring.xml文件中,然后手动加载文件,使各个类加载到内存中。
当程序加载各个类的时候,会自动运行各个类中的构造函数,以此形成对象。
在One类和Two类中,分别实现一个参数的构造如下
加载spring配置文件,初始化bean如下
那么。结果如何呢?
结论:spring会按照bean的顺序依次初始化xml中配置的所有bean
如何进行自定义注解:
首先需要在某个需要自定义注解的类的上方添加自定义注解的名称:@rpcserver;然后用context来获取到所有的标注有rpcserver的注解的类:
Map<String, Object> serviceBeanMap= ctx
.getBeansWithAnnotation(RpcService.class);
然后就可以使用,但是自定义注解需要一定的规则:
1) 自定义注解类
VM将在运行期也保留注解,因此可以通过反射机制读取注解的信息。
Component是为了让spring去扫描;
2) 然后在另外的类中加注解:
不仅可以运行rpcserver对象,而且可以传递相应的参数;
3) 然后再到主函数中,加载这一类对象,即可:
上面的Myserver是spring自己标注的类,然后在spring2.xml中配置了相应的包用来加载,当main函数加载spring2.xml文件时,会将所有@component的类加载进入内存中,加载的过程中,Myserver类也会被加载,顾Myserver的构造函数也会被执行,在构造Myserver时发现,这一个类实现了ApplicationContextAware接口,因此会实现这一接口中的方法,即:setApplicationContext方法,这一方法会通过反射方式将所有建有rpcserver标签的类全部构建起来。
这个类上的注解也可以被引入:
Stringvalue = serviceBean.getClass().GetAnnotation(Rpcservice.class).value();
类中有各种注解,spring中是允许构建的,但是有些注解虽然构建,spring却认不出来,此时需要反射的方式将这一自定义的注解加入到spring中。
另外还有一些方法可以加载自定义的注解类:
Test方法是可以直接运行的,将spring与Junit结合,类Myserver3中有个autowired注解,此时程序会将helloservice下的所有类注入进来。
4.1.1. 通过ApplicationContextAware加载Spring上下文环境
在One中实现ApplicationContextAware接口会出现如何的变换呢?
结果:
4.1.2. InitializingBean的作用
在One中实现InitializingBean接口呢?
结果:
4.1.3. 如果使用注解@Component
使用@Component注入类,那么它的顺序是如何呢?
4.1.4. 结论
1、 spring先检查注解注入的bean,并将它们实例化
2、 然后spring初始化bean的顺序是按照xml中配置的顺序依次执行构造
3、 如果某个类实现了ApplicationContextAware接口,会在类初始化完成后调用setApplicationContext()方法进行操作
4、 如果某个类实现了InitializingBean接口,会在类初始化完成后,并在setApplicationContext()方法执行完毕后,调用afterPropertiesSet()方法进行操作
4.2. 注解使用回顾
1、在spring中,用注解来向Spring容器注册Bean。需要在applicationContext.xml中注册<context:component-scanbase-package=”pagkage1[,pagkage2,…,pagkageN]”/>。
2、如果某个类的头上带有特定的注解@Component/@Repository/@Service/@Controller,就会将这个对象作为Bean注册进Spring容器
3、在使用spring管理的bean时,无需在对调用的对象进行new的过程,只需使用@Autowired将需要的bean注入本类即可
4.3. 自定义注解
4.3.1. 解释
1、自定义注解的作用:在反射中获取注解,以取得注解修饰的“类、方法、属性”的相关解释。
2、java内置注解
@Target 表示该注解用于什么地方,可能的 ElemenetType 参数包括:
ElemenetType.CONSTRUCTOR 构造器声明
ElemenetType.FIELD 域声明(包括 enum 实例)
ElemenetType.LOCAL_VARIABLE 局部变量声明
ElemenetType.METHOD 方法声明
ElemenetType.PACKAGE 包声明
ElemenetType.PARAMETER 参数声明
ElemenetType.TYPE 类,接口(包括注解类型)或enum声明
@Retention 表示在什么级别保存该注解信息。可选的 RetentionPolicy 参数包括:
RetentionPolicy.SOURCE 注解将被编译器丢弃
RetentionPolicy.CLASS 注解在class文件中可用,但会被VM丢弃
RetentionPolicy.RUNTIME JVM将在运行期也保留注释,因此可以通过反射机制读取注解的信息。
4.3.2. 实现
定义自定义注解
@Target({ ElementType.TYPE })//注解用在接口上 @Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息 @Component public @interface RpcService { String value(); } |
2、将直接类加到需要使用的类上,我们可以通过获取注解,来得到这个类
@RpcService("HelloService") public class HelloServiceImpl implements HelloService { public String hello(String name) { return "Hello! " + name; } } |
3、类实现的接口
public interface HelloService { String hello(String name); } |
4、通过ApplicationContext获取所有标记这个注解的类
@Component public class MyServer implements ApplicationContextAware { @SuppressWarnings("resource") public static void main(String[] args) { new ClassPathXmlApplicationContext("spring2.xml"); } public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceBeanMap = ctx .getBeansWithAnnotation(RpcService.class); for (Object serviceBean : serviceBeanMap.values()) { try { Method method = serviceBean.getClass().getMethod("hello", new Class[]{String.class}); Object invoke = method.invoke(serviceBean, "bbb"); System.out.println(invoke); } catch (Exception e) { e.printStackTrace(); } } } } |
5、 结合spring实现junit测试
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring2.xml") public class MyServer implements ApplicationContextAware { @Test public void helloTest1() { } public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceBeanMap = ctx .getBeansWithAnnotation(RpcService.class); for (Object serviceBean : serviceBeanMap.values()) { try { Method method = serviceBean.getClass().getMethod("hello", new Class[] { String.class }); Object invoke = method.invoke(serviceBean, "bbb"); System.out.println(invoke); } catch (Exception e) { e.printStackTrace(); } } } } |
5. 轻量级RPC框架开发
5.1. 轻量级RPC框架需求分析及原理分析
5.1.1. netty实现的RPC的缺点
rpc自身的框架可以直接构建:
在实际应用中,一般构建业务需要的类,实现业务需要的接口,此时,该如何使用rpc服务的类,如下:
程序构建的是一个HelloServiceImpl类,实现了HelloService接口,如果要调用rpc的服务,此时只需要在注解中添加@RPCService(HelloService.class)注解。
用上面的方式,程序员只需要自己创建一个类,然后加上公司提供的注解:rpcservice,则该类即可对外提供服务。
具体该如何实现?
在完成上面要求之前,需要在配置文件(spring.xml)中进行相应的配置:
serviceRegistry与rpcserver两个类是jar包中写好的类,因此无法进行修改,此时将这两个类导入到文件中,然后引入自动扫包标签。
Rpcserver类中注入了ApplicationContextAware,并通过反射的方式(如上面介绍)自定义了rpcserver标签,然后在spring.xml文件中定义扫包标签,此时,启动程序则会自动的启动rpcserver服务。
可视化其流程为:
1) 用户构建工程,其中实现helloserverimpl类,并加上rpcserver注解,同时确定一个spring.xml文件,引入rpcserver的bean;
2) 然后构建bootstrap,在其中的main方法中引入spring.xml文件,运行时便会启动
Springcontext,之后spring就开始工作了,此时spring就会扫描工程中所有的注解,一扫描就会扫到bean中的各种类。
3) spring将bean中的内容进行构造,构造出框架中的rpcserver,rpcserver中要implements contextAware,其中有一个方法是setApplicationContext(),可以在其中启动netty服务。
4) 在setApplicationContext()中定义一个beanmap,然后netty.start(),接下来只需要@rpcserver标签就可以了。
下面可以具体看看RPCServer:
public class RpcServer implements ApplicationContextAware, InitializingBean {
private static final Logger LOGGER= LoggerFactory.getLogger(RpcServer.class);
private String serverAddress;
privateServiceRegistry serviceRegistry;
private Map<String,Object> handlerMap = new HashMap<String, Object>();
publicRpcServer(String serverAddress) {
this.serverAddress = serverAddress;
}
publicRpcServer(String serverAddress, ServiceRegistry serviceRegistry){
this.serverAddress = serverAddress;
this.serviceRegistry = serviceRegistry;
}
/**
* 通过注解,获取RpcService.class,将它放到map中
*/
public void setApplicationContext(ApplicationContext ctx)
throws BeansException{
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName= serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
/**
* 服务端启动类
*/
public void afterPropertiesSet() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel)
throws Exception {
channel.pipeline()
.addLast(new RpcDecoder(RpcRequest.class))
// 注册解码 in-1
.addLast(new RpcEncoder(RpcResponse.class))
// 注册编码 out
.addLast(new RpcHandler(handlerMap));
//注册RpcHandler in-2
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ChannelFuture future = bootstrap.bind(host, port).sync();
LOGGER.debug("server started on port {}", port);
if (serviceRegistry != null) {
serviceRegistry.register(serverAddress);
}
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
在此基础上接下来netty业务该如何操作:
1) 绑定消息处理流水线;
2) in解码用户的请求;
3) in业务调用的handler;
4) out将结果编码的handler
同时调用registerServer注册:
用户请求进来后会通过request进入到netty的第一个handler中,第一个handler往往是一个decode方法,即RPCDecoder,在此类中进行decode方法调用,对文件进行解码;
在deserialize中进行反序列化:
此时decode中才会将反序列化的数据add到out中。
一旦有数据往下传输,数据会进入到netty中的第二个handler中,
即:new RpcHandler(handlerMap);在RpcHandler中有一个channelread0的方法,一旦有数据进来,数据会进入到channelRead0()中,数据会以request的方式传输,进入到handle方法中,channelRead0结束之后就会进入到out中,
out中有一个encode方法用来编码。
截止到目前,服务端就将请求做完了,这条线是用户有请求时便会执行的;
另外还有一条线是程序一启动就会执行的,即实现向zookeeper注册,对于用户来讲并不知道服务器在哪里,因此需要用zookeeper来进行导引,在RPCserver中,有serviceRegister类:
然后将这个serviceRegistry注入到rpcserver中去,因此rpcserver调用运行时
serviceRegistry不为空:
上面serverAddress地址为服务器的地址,即nettyserver的地址。
在注册类中,主要有三个文件:
serviceDiscovery.java主要针对客户端;serviceRegistry.java主要针对服务端;分别安装在两个jar包里。
类文件为:
/**
* 服务注册,ZK 在该架构中扮演了“服务注册表”的角色,用于注册所有服务器的地址与端口,并对客户端提供服务发现的功能
*/
publicclass ServiceRegistry {
privatestaticfinal Logger LOGGER= LoggerFactory.getLogger(ServiceRegistry.class);
private CountDownLatchlatch = new CountDownLatch(1);
private String registryAddress;
publicServiceRegistry(String registryAddress) {
this.registryAddress = registryAddress;
}
/**
* 创建zookeeper链接
* @param data
*/
public void register(String data) {
if (data != null) {
ZooKeeper zk =connectServer();
if (zk!= null) {
createNode(zk,data);
}
}
}
/**
* 创建zookeeper链接,监听
* @return
*/
private ZooKeeperconnectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,
new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (Exception e){
LOGGER.error("", e);
}
return zk;
}
/**
* 创建节点
* @param zk
* @param data
*/
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
if (zk.exists(Constant.ZK_REGISTRY_PATH, null) == null) {
zk.create(Constant.ZK_REGISTRY_PATH, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
String path = zk.create(Constant.ZK_DATA_PATH, bytes,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} =>{})", path, data);
} catch (Exception e){
LOGGER.error("", e);
}
}
}
则这个如何向zk注册,serviceRegistry在注册的时候传进来的即为zk的地址,
在rpcserver类中调用的是这个类下的register方法,registryAddress的地址为zk的地址。
下图中的地址为serverAddress的地址,下一句的目的是为了将serverAddress注册到zk中,
在register方法中先new一个zk,即connectServer方法:
返回一个zk,并用这个zk创建并返回一个节点:
创建完成节点后,就像zk中注册本服务器的地址信息。
以上为服务端的设计,客户端该如何设计呢?
客户端的类为:
/**
* RPC 客户端(用于发送 RPC 请求)
*/
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
private static final Logger LOGGER= LoggerFactory.getLogger(RpcClient.class);
private String host;
private int port;
private RpcResponse response;
private final Object obj = new Object();
public RpcClient(String host, intport) {
this.host = host;
this.port = port;
}
/**
* 链接服务端,发送消息
* @param request
* @return
* @throws Exception
*/
public RpcResponsesend(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
publicvoid initChannel(SocketChannel channel)
throws Exception {
// 向pipeline中添加编码、解码、业务处理的handler
channel.pipeline()
.addLast(new RpcEncoder(RpcRequest.class))
.addLast(new RpcDecoder(RpcResponse.class))
.addLast(RpcClient.this);
}
}).option(ChannelOption.SO_KEEPALIVE, true);
// 链接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().writeAndFlush(request).sync();
// 用线程等待的方式决定是否关闭连接
synchronized(obj) {
obj.wait();
}
if (response != null) {
future.channel().closeFuture().sync();
}
returnresponse;
} finally {
group.shutdownGracefully();
}
}
/**
* 读取服务端的返回结果
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response)
throws Exception {
this.response = response;
synchronized(obj) {
obj.notifyAll();
}
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
LOGGER.error("client caught exception", cause);
ctx.close();
}
}
代理类的处理方式:
/**
* RPC 代理(用于创建 RPC 服务代理)
*/
public class RpcProxy {
private String serverAddress;
private ServiceDiscovery serviceDiscovery;
public RpcProxy(String serverAddress) {
this.serverAddress = serverAddress;
}
public RpcProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
/**
* 创建代理
* @param interfaceClass
* @return
*/
@SuppressWarnings("unchecked")
public <T> Tcreate(Class<?> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
newClass<?>[] { interfaceClass }, newInvocationHandler() {
public Objectinvoke(Object proxy, Method method,
Object[] args) throws Throwable {
//创建RpcRequest,封装被代理类的属性
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass()
.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
//查找服务
if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover();
}
//随机获取服务的地址
String[] array = serverAddress.split(":");
String host = array[0];
intport = Integer.parseInt(array[1]);
//创建RpcClient,链接服务端
RpcClient client = new RpcClient(host, port);
//通过netty实现RPC
RpcResponse response = client.send(request);
//返回信息
if (response.isError()) {
throwresponse.getError();
} else {
returnresponse.getResult();
}
}
});
}
}
客户端的设计:
用proxy创建一个HelloServer的业务代理,然后用动态代理创建HelloServer业务类的对应方法。
在代理类proxy中,可以配置相应的request中,
以此来确定请求专递给谁,并且传递什么内容,紧接着发现服务器在哪里;
在discover中如何发现服务器:
如果size大于零,说明有服务器,就可以发送相应的请求。
此时可以安装负载均衡的算法。
在发送之前会有一个编码的过程,这一过程是在send中实现的,然后会发送给到服务端。
Spring中应配置为:
客户端的示意图为:
1) 创建spring.xml文件并注入tomcat中的listener:springlistener,spring一旦启动,回去加载配置文件spring.xml,构造出rpcproxy,并交给rpcproxy类,;
2) 用户会用rpcproxy.create(业务接口类.class),创建业务接口service,然后用这个service调用hello方法。
3) Create创建一个动态代理对象,这个service调用的便是动态代理对象中的invoke方法,在这invoke方法中发现服务器的地址,封装request对象,启动nettyclient客户端,调用send方法,调用了netty中的out流水线,就会到达服务端。
4) 服务端处理完成后,就会返回response中,进行反序列化,从中提取result并返回。
在我们平常使用的RPC中,例如webservice,使用的习惯类似于下图:
但是netty的实现过于底层,我们不能够像以前一样只关心方法的调用,而是要关心数据的传输,对于不熟悉netty的开发者,需要了解很多netty的概念和逻辑,才能实现RPC的调用。
应上面的需求,我们需要基于netty实现一个我们熟悉的RPC框架。逻辑如下:
5.2. zookeeper API简单使用及框架介绍
5.2.1. zk在框架中的实现
在上面的框架中,server端存在着一个问题,就是单点问题,也就是说,当服务端“挂了”之后,框架的使用就造成了单点屏障。
我们可以通过zookeeper来实现服务端的负载均衡