Hadoop学习(五)——轻量级RPC框架

        rpc是hadoop在运行过程中,服务器间相互访问的通讯基础,rpc底层是以socket的方式通讯,在hadoop中使用了较新的NIO,以提高网络的传输速度。

1.  RPC原理学习

1.1.  什么是RPC

         RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

         RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

Hadoop学习(五)——轻量级RPC框架                          

有一台机器,写一个order(@autowired),然后再在服务端写一个service(@rpcservice),一旦server加上注解,则对应的实现类就变成socket服务。当项目一启动的时候,程序会扫描注解,将所有的注解取出,spring会构造相应的bean,然后可以通过context获得工程中所有的service,放在对应的一个hashmap中直接调用,左边是注解id,右边是对应的服务类,基于这样的思维,写service的人就不需要太关注底层的关系,只需要写service并加上注解就可以了。

扫描注解的时候会启动某一个server,并带有一定的端口,此时客户端如果请求过来,只需要了解一定的接口信息即可,服务端一旦拿到请求,就会到map中找,找到即可确定service的位置,通过反射的方式去掉用实现类。

         当客户端调用具体的方法时是用的动态代理,即需要给客户端注入一个动态代理对象,在对象中加入proxy,然后调用类中方法createorder,此调用会被invoke方法拦截,并进行相应的强化。

Hadoop学习(五)——轻量级RPC框架        

         由上图的proxy可以看到,我们已知接口、method、参数,只需要将这三个数字封装一下,然后在invoke中封装成一个请求,即request对象。然后将这些值通过socket传到服务端。服务器端会decode解析request。然后使用具体的请求。

Hadoop学习(五)——轻量级RPC框架

         返回的结果会存放在result中,然后编码到response中,并通过socket返回给客户端。

Hadoop学习(五)——轻量级RPC框架

         当一个人开发完app后,如何知道他的server在哪里呢?

         因此需要一个zookeeper,服务端在启动服务器的时候,会将注释向zookeeper注册,则客户端在调用的时候就会将接口名称,服务器的地址等参数向zookeeper中查询服务所在的位置,然后才会启动一个socket。

Hadoop学习(五)——轻量级RPC框架

         由此,完成了rpc框架的设计图。

         此时发现socketserver是传统的通讯方式,因此引入一个NETTY框架,即使用NIO的原理来进行通讯。

1.2.  RPC原理

Hadoop学习(五)——轻量级RPC框架

运行时,一次客户机对服务器的RPC调用,其内部操作大致有如下十步:

1. 调用客户端句柄;执行传送参数

2. 调用本地系统内核发送网络消息

3. 消息传送到远程主机

4. 服务器句柄得到消息并取得参数

5. 执行远程过程

6. 执行的过程将结果返回服务器句柄

7. 服务器句柄返回结果,调用远程系统内核

8. 消息传回本地主机

9. 客户句柄由内核接收消息

10. 客户接收句柄返回的数据

2.  nio原理学习(nio的优势不在于数据传送的速度)

2.1.  简介

         nio 是New IO 的简称,在jdk1.4 里提供的新api 。Sun 官方标榜的特性如下: 为所有的原始类型提供(Buffer)缓存支持。字符集编码解码解决方案。 Channel :一个新的原始I/O 抽象。 支持锁和内存映射文件的文件访问接口。 提供多路(non-bloking) 非阻塞式的高伸缩性网络I/O 。

2.2.  socket nio原理

2.2.1.   传统的I/O

使用传统的I/O程序读取文件内容, 并写入到另一个文件(或Socket), 如下程序:

File.read(fileDesc,buf, len);

Socket.send(socket,buf, len);

会有较大的性能开销, 主要表现在一下两方面:

1. 上下文切换(contextswitch), 此处有4次用户态和内核态的切换

2.Buffer内存开销, 一个是应用程序buffer, 另一个是系统读取buffer以及socket buffer

传统IO的原理:

如果想从磁盘读一些数据,数据首先会被操作系统堵在他们内核的缓存中,然后再交给应用的缓存,然后应用缓存中的数据会放到socket中的缓存中,然后再进入到NIC缓存中。

通过这多层交互,引发传输变慢,因此引发了NIO,来提升效率。

Hadoop学习(五)——轻量级RPC框架

1) 先将文件内容从磁盘中拷贝到操作系统buffer

2) 再从操作系统buffer拷贝到程序应用buffer

3) 从程序buffer拷贝到socketbuffer

4) 从socketbuffer拷贝到协议引擎.

2.2.2.   NIO

NIO技术省去了将操作系统的read buffer拷贝到程序的buffer, 以及从程序buffer拷贝到socket buffer的步骤,直接将 read buffer 拷贝到 socket buffer. java 的FileChannel.transferTo() 方法就是这样的实现, 这个实现是依赖于操作系统底层的sendFile()实现的.

publicvoidtransferTo(long position, long count, WritableByteChannel target);

他的底层调用的是系统调用sendFile()方法

sendfile(intout_fd, int in_fd, off_t *offset, size_t count);

如下图:

应用缓存只是传递一个指令过去,并没有实现缓存的交互,因此提升了传输速度。(jdk1.5之后是非阻塞IO,jdk1.7之后有异步非阻塞IO)

 Hadoop学习(五)——轻量级RPC框架

   

在通讯的发展过程中产生过伪异步。

即客户端socket的主线程,不同的产生子线程,用来访问服务端,以此来提高访问速度。

Hadoop学习(五)——轻量级RPC框架

真正的异步是AIO:

         在服务器代码中向内核注册一个监听器(select),一旦内核有事件发生就会通知监听器,则监听器立马发出指令,要求客户端与内核产生连接,客户端与内核建立成功后,就会产生三次握手,三次握手成功后内核会通知服务器端,然后select会继续注册一个read监听,如果客户端有数据进来,则read就会行程指令,通知服务端。

         如果客户端有数据发送到内核中,内核中的缓存就会有数据,一旦内核发现缓存中有数据,就会将其放到应用系统在内核中建立的缓存中,然后通知应用系统,可以进行read了,此时应用系统才会通过channel去读,因此产生异步操作。

Hadoop学习(五)——轻量级RPC框架

NIO的代码:

1)服务端线程:

Hadoop学习(五)——轻量级RPC框架        

服务端:

publicclass MultiplexerTimeServer implements Runnable {

    private Selector selector;

    privateServerSocketChannel servChannel;

    privatevolatilebooleanstop;

    /**

    * 初始化多路复用器、绑定监听端口

     * @param port

     */

    publicMultiplexerTimeServer(intport) {

    try {

        selector = Selector.open();

        servChannel = ServerSocketChannel.open();

        servChannel.configureBlocking(false);

        servChannel.socket().bind(newInetSocketAddress(port), 1024);

        servChannel.register(selector, SelectionKey.OP_ACCEPT);

       System.out.println("The time server is start in port : " + port);

    } catch (IOException e){

        e.printStackTrace();

       System.exit(1);

    }

}

    publicvoid stop() {

    this.stop = true;

    }

    /* (non-Javadoc)

     * @see java.lang.Runnable#run()

     */

    @Override

    publicvoid run() {

    while (!stop) {

        try {

        selector.select(1000);

        Set<SelectionKey> selectedKeys = selector.selectedKeys();

        Iterator<SelectionKey> it= selectedKeys.iterator();

        SelectionKey key = null;

        while (it.hasNext()){

           key = it.next();

           it.remove();

           try {

            handleInput(key);

           } catch (Exception e) {

            if (key != null) {

               key.cancel();

               if (key.channel() != null)

                key.channel().close();

            }

           }

        }

        } catch (Throwable t) {

        t.printStackTrace();

        }

    }

    // 多路复用器关闭后,所有注册在上面的ChannelPipe等资源都会被自动去注册并关闭,所以不需要重复释放资源

    if (selector != null)

        try {

        selector.close();

        } catch (IOException e) {

        e.printStackTrace();

        }

    }

    privatevoid handleInput(SelectionKey key) throws IOException {

    if (key.isValid()) {

        // 处理新接入的请求消息

        if (key.isAcceptable()) {

        // Accept the new connection

        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

        SocketChannel sc = ssc.accept();

        sc.configureBlocking(false);

        // Add the new connection to the selector

        sc.register(selector, SelectionKey.OP_READ);

        }

        if (key.isReadable()) {

        // Read the data

        SocketChannel sc =(SocketChannel) key.channel();

        ByteBuffer readBuffer = ByteBuffer.allocate(1024);

        intreadBytes = sc.read(readBuffer);

        if (readBytes > 0) {

           readBuffer.flip();

           byte[] bytes = newbyte[readBuffer.remaining()];

           readBuffer.get(bytes);

           String body = new String(bytes, "UTF-8");

           System.out.println("The time server receive order : "

               + body);

           String currentTime = "QUERY TIME ORDER"

               .equalsIgnoreCase(body) ? newjava.util.Date(

               System.currentTimeMillis()).toString()

           doWrite(sc, currentTime);

        } elseif (readBytes < 0) {

           // 对端链路关闭

           key.cancel();

           sc.close();

        } else

           ; // 读到0字节,忽略

        }

    }

    }

    privatevoid doWrite(SocketChannel channel, String response)

        throws IOException {

    if (response != null && response.trim().length() > 0) {

        byte[] bytes = response.getBytes();

       ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);

        writeBuffer.put(bytes);

        writeBuffer.flip();

        channel.write(writeBuffer);

    }

    }

}

NIO的客户端:

public class TimeClientHandle implements Runnable {

    private String host;

    private int port;

    private Selector selector;

    private SocketChannel socketChannel;

    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {

    this.host = host == null ? "127.0.0.1" : host;

    this.port = port;

    try {

        selector = Selector.open();

        socketChannel = SocketChannel.open();

        socketChannel.configureBlocking(false);

    } catch (IOException e){

        e.printStackTrace();

       System.exit(1);

    }

    }

    /*

     * (non-Javadoc)

     * @see java.lang.Runnable#run()

     */

    @Override

    public void run() {

    try {

       doConnect();

    } catch (IOException e){

        e.printStackTrace();

       System.exit(1);

    }

    while (!stop) {

        try {

        selector.select(1000);

        Set<SelectionKey> selectedKeys = selector.selectedKeys();

        Iterator<SelectionKey> it= selectedKeys.iterator();

        SelectionKey key = null;

        while (it.hasNext()){

           key = it.next();

           it.remove();

           try {

            handleInput(key);

           } catch (Exception e) {

            if (key != null) {

               key.cancel();

               if (key.channel() != null)

                key.channel().close();

            }

           }

        }

        } catch (Exception e) {

        e.printStackTrace();

        System.exit(1);

        }

    }

    // 多路复用器关闭后,所有注册在上面的ChannelPipe等资源都会被自动去注册并关闭,所以不需要重复释放资源

    if (selector != null)

        try {

        selector.close();

        } catch (IOException e) {

        e.printStackTrace();

        }

    }

    private void doConnect() throws IOException {

    // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答

    if (socketChannel.connect(new InetSocketAddress(host, port))) {

        socketChannel.register(selector, SelectionKey.OP_READ);

       doWrite(socketChannel);

    } else

        socketChannel.register(selector, SelectionKey.OP_CONNECT);

    }

    private void doWrite(SocketChannel sc) throws IOException {

    byte[] req = "QUERY TIME ORDER".getBytes();

    ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);

    writeBuffer.put(req);

    writeBuffer.flip();

    sc.write(writeBuffer);

    if (!writeBuffer.hasRemaining())

       System.out.println("Send order 2 server succeed.");

    }

    private void handleInput(SelectionKey key) throws IOException {

    if (key.isValid()) {

        // 判断是否连接成功

       SocketChannel sc = (SocketChannel) key.channel();

        if (key.isConnectable()) {

        if (sc.finishConnect()){

           sc.register(selector, SelectionKey.OP_READ);

           doWrite(sc);

        } else

           System.exit(1);// 连接失败,进程退出

        }

        if (key.isReadable()) {

        ByteBuffer readBuffer = ByteBuffer.allocate(1024);

        intreadBytes = sc.read(readBuffer);

        if (readBytes > 0) {

           readBuffer.flip();

           byte[] bytes = newbyte[readBuffer.remaining()];

           readBuffer.get(bytes);

           String body = new String(bytes, "UTF-8");

           System.out.println("Now is : " + body);

           this.stop = true;

        } elseif (readBytes < 0) {

           // 对端链路关闭

           key.cancel();

           sc.close();

        }else

            ; // 读到0字节,忽略

        }

    }

  }

}

客户端:

Hadoop学习(五)——轻量级RPC框架

3.  netty常用API学习

3.1.  netty简介

     Netty是基于Java NIO的网络应用框架.

    Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。Netty的内部实现时很复杂的,但是Netty提供了简单易用的api从网络处理代码中解耦业务逻辑。Netty是完全基于NIO实现的,所以整个Netty都是异步的。

网络应用程序通常需要有较高的可扩展性,无论是Netty还是其他的基于Java NIO的框架,都会提供可扩展性的解决方案。Netty中一个关键组成部分是它的异步特性.

3.2.  netty的helloworld

3.2.1.   下载netty包

•       下载netty包,下载地址http://netty.io/

3.2.2.   服务端启动类

package com.netty.demo.server;

 

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

 

/**

 * • 配置服务器功能,如线程、端口 • 实现服务器处理程序,它包含业务逻辑,决定当有一个请求连接或接收数据时该做什么

* @author wilson

*/

public class EchoServer {

        private final int port;

        public EchoServer(int port) {

              this.port = port;

       }

        public void start() throws Exception {

              EventLoopGroup eventLoopGroup = null;

              try {

                     //创建ServerBootstrap实例来引导绑定和启动服务器

                     ServerBootstrap serverBootstrap = new ServerBootstrap();

                     //创建NioEventLoopGroup对象来处理事件,如接受新连接、接收数据、写数据等等

                     eventLoopGroup = new NioEventLoopGroup();

                     //指定通道类型为NioServerSocketChannel,设置InetSocketAddress让服务器监听某个端口已等待客户端连接。

                     serverBootstrap.group(eventLoopGroup)

.channel(NioServerSocketChannel.class)

.localAddress("localhost",port)

.childHandler(new ChannelInitializer<Channel>() {

                            //设置childHandler执行所有的连接请求

                            @Override

                            protected void initChannel(Channel ch) throws Exception {

                                   ch.pipeline()

.addLast(new EchoServerHandler());

                            }

                                   });

                     // 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。

                     ChannelFuture channelFuture = serverBootstrap.bind().sync();

                     System.out.println("开始监听,端口为:" +

 channelFuture.channel().localAddress());

                     channelFuture.channel().closeFuture().sync();

              } finally {

                     eventLoopGroup.shutdownGracefully().sync();

              }

       }

        public static void main(String[] args) throws Exception {

              new EchoServer(20000).start();

       }

}

3.2.3.   服务端回调方法

package com.netty.demo.server;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

       @Override

       public void channelRead(ChannelHandlerContext ctx, Object msg)

                     throws Exception {

              System.out.println("server 读取数据……");

              //读取数据

        ByteBuf buf = (ByteBuf) msg;

        byte[] req = new byte[buf.readableBytes()];

        buf.readBytes(req);

        String body = new String(req, "UTF-8");

        System.out.println("接收客户端数据:" + body);

        //向客户端写数据

        System.out.println("server向client发送数据");

        String currentTime = new Date(System.currentTimeMillis()).toString();

        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());

        ctx.write(resp);

       }

       @Override

       public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

              System.out.println("server 读取数据完毕..");

        ctx.flush();

//刷新后才将数据发出到SocketChannel

       }

       @Override

       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

                     throws Exception {

              cause.printStackTrace();

              ctx.close();

       }

}

3.2.4.   客户端启动类

package com.netty.demo.client;

 

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**

 * • 连接服务器写数据到服务器等待接受服务器返回相同的数据关闭连接

* @author wilson

*/

public class EchoClient {

 

       private final String host;

       private final int port;

 

       public EchoClient(String host, int port) {

              this.host = host;

              this.port = port;

       }

 

       public void start() throws Exception {

              EventLoopGroup nioEventLoopGroup = null;

              try {

                     //创建Bootstrap对象用来引导启动客户端,客户端引导类。

                     Bootstrap bootstrap = new Bootstrap();

                     //创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据

                     nioEventLoopGroup = new NioEventLoopGroup();

                     //创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址

                     bootstrap.group(nioEventLoopGroup)

.channel(NioSocketChannel.class)

.remoteAddress(new InetSocketAddress(host, port))

                                   .handler(new ChannelInitializer<SocketChannel>() {

                                   //添加一个ChannelHandler,客户端成功连接服务器后就会被执行,用来存放处理业务逻辑的地方。

Hadoop学习(五)——轻量级RPC框架

                                          @Override

                                          protected void initChannel(SocketChannel ch)

                                                        throws Exception {

                                                 ch.pipeline()

.addLast(new EchoClientHandler());

 

                                          }

                                   });

                     // • 调用Bootstrap.connect()来连接服务器

                     ChannelFuture f = bootstrap.connect().sync();

                     // • 最后关闭EventLoopGroup来释放资源

                     f.channel().closeFuture().sync();

              } finally {

                     nioEventLoopGroup.shutdownGracefully().sync();

              }

       }

 

       public static void main(String[] args) throws Exception {

              new EchoClient("localhost", 20000).start();

       }

}


3.2.5.   客户端回调方法

package com.netty.demo.client;

 

import io.netty.buffer.ByteBuf;

import io.netty.buffer.ByteBufUtil;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

        

       public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 

             //客户端连接服务器后被调用

           @Override 

           public void channelActive(ChannelHandlerContext ctx) throws Exception { 

                  System.out.println("客户端连接服务器,开始发送数据……");

                  byte[] req = "QUERY TIME ORDER".getBytes();

                  ByteBuf  firstMessage = Unpooled.buffer(req.length);

               firstMessage.writeBytes(req);

               ctx.writeAndFlush(firstMessage); 

           } 

           //•    从服务器接收到数据后调用

           @Override 

           protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 

                   System.out.println("client 读取server数据..");

                //服务端返回消息后

                ByteBuf buf = (ByteBuf) msg;

                byte[] req = new byte[buf.readableBytes()];

                buf.readBytes(req);

                String body = new String(req, "UTF-8");

                System.out.println("服务端数据为 :" + body);

         //• 发生异常时被调用

           @Override 

           public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 

                   System.out.println("client exceptionCaught..");

                // 释放资源

                ctx.close();

           } 

       } 

3.3.  netty中handler的执行顺序

3.3.1.   简介

Handler在netty中,无疑占据着非常重要的地位。Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。一句话,没有它做不到的只有你想不到的。

Netty中的所有handler都实现自ChannelHandler接口。按照输出输出来分,分为ChannelInboundHandler、ChannelOutboundHandler两大类。ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。

Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:

Hadoop学习(五)——轻量级RPC框架

3.3.2.   代码

服务端:

Hadoop学习(五)——轻量级RPC框架

如上面的四个handler——EchInHandler1;EchInHandler2;EchOutHandler1;EchOutHandler2;

In的handler要顺序执行先执行1,然后执行2,而对于out,则反向执行,即先执行2,后执行1,如果代码的顺序进行修改,如下图:

Hadoop学习(五)——轻量级RPC框架 Hadoop学习(五)——轻量级RPC框架

顺序依然先2后1,但是如果将out放在in后面,则这两个out不会执行,就像上图中的顺序是不执行的。

Handler1

Hadoop学习(五)——轻量级RPC框架

fireChannelRead()是将数据流读入到另一个inHandler中,而write()是为了将数据流读入到OutHandler中。

Handler2

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

outHandler1

Hadoop学习(五)——轻量级RPC框架

outHandler2

Hadoop学习(五)——轻量级RPC框架

ClientHandler

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

Client:

Hadoop学习(五)——轻量级RPC框架

3.3.3.   总结

在使用Handler的过程中,需要注意:

1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler。

2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。

3、流水线pipeline中outhandler不能放在最后,否则不生效。

4、Handler的消费处理放在最后一个处理。

3.4.  netty发送对象

3.4.1.   简介

Netty中,通讯的双方建立连接后,会把数据按照ByteBuf的方式进行传输,例如http协议中,就是通过HttpRequestDecoder对ByteBuf数据流进行处理,转换成http的对象。基于这个思路,我自定义一种通讯协议:Server和客户端直接传输java对象。

实现的原理是通过Encoder把java对象转换成ByteBuf流进行传输,通过Decoder把ByteBuf转换成java对象进行处理,处理逻辑如下图所示:

Hadoop学习(五)——轻量级RPC框架

3.4.2.   代码

Netty一直发送的是字符串,如何进行对象发送:

只需要加一个序列化与反序列化。

那如何发送person对象。

Netty给的是bytebuff,需要转化成byte[],然后反序列化成String。

这是JAVA的IO流:

Hadoop学习(五)——轻量级RPC框架

对象的序列化与反序列化用的工具为:

Hadoop学习(五)——轻量级RPC框架

需要传递的对象为:

Hadoop学习(五)——轻量级RPC框架

客户端为:

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

服务端:

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

工具类:

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架 

NIO的优势不在于速度,传统的IO会在多个地方行程阻塞;对服务器的性能影响比较大,

Hadoop学习(五)——轻量级RPC框架

而NIO则会取消这些阻塞,减小服务器压力。

4.  Spring(IOC/AOP)注解学习

4.1.  spring的初始化顺序

在spring的配置文件中配置bean,如下

Hadoop学习(五)——轻量级RPC框架

通过bean中的字符串构造对象,用的是反射的内容,xml文件中的注解方式有两种:其一、直接用component-scan,然后再在各个class的上面写上@component,程序加载时会自动扫描各个类,然后将其加载到内存中。

其二、用一个个的bean注解,添加到spring.xml文件中,然后手动加载文件,使各个类加载到内存中。

         当程序加载各个类的时候,会自动运行各个类中的构造函数,以此形成对象。

在One类和Two类中,分别实现一个参数的构造如下

Hadoop学习(五)——轻量级RPC框架Hadoop学习(五)——轻量级RPC框架

加载spring配置文件,初始化bean如下

Hadoop学习(五)——轻量级RPC框架

那么。结果如何呢?

Hadoop学习(五)——轻量级RPC框架

结论:spring会按照bean的顺序依次初始化xml中配置的所有bean

如何进行自定义注解:

首先需要在某个需要自定义注解的类的上方添加自定义注解的名称:@rpcserver;然后用context来获取到所有的标注有rpcserver的注解的类:

Map<String, Object> serviceBeanMap= ctx

                .getBeansWithAnnotation(RpcService.class);

然后就可以使用,但是自定义注解需要一定的规则:

1)  自定义注解类

Hadoop学习(五)——轻量级RPC框架

VM将在运行期也保留注解,因此可以通过反射机制读取注解的信息。

Component是为了让spring去扫描;

2)  然后在另外的类中加注解:

Hadoop学习(五)——轻量级RPC框架

不仅可以运行rpcserver对象,而且可以传递相应的参数;

3)  然后再到主函数中,加载这一类对象,即可:

Hadoop学习(五)——轻量级RPC框架

上面的Myserver是spring自己标注的类,然后在spring2.xml中配置了相应的包用来加载,当main函数加载spring2.xml文件时,会将所有@component的类加载进入内存中,加载的过程中,Myserver类也会被加载,顾Myserver的构造函数也会被执行,在构造Myserver时发现,这一个类实现了ApplicationContextAware接口,因此会实现这一接口中的方法,即:setApplicationContext方法,这一方法会通过反射方式将所有建有rpcserver标签的类全部构建起来。

Hadoop学习(五)——轻量级RPC框架

这个类上的注解也可以被引入:

Hadoop学习(五)——轻量级RPC框架

Stringvalue = serviceBean.getClass().GetAnnotation(Rpcservice.class).value();

         类中有各种注解,spring中是允许构建的,但是有些注解虽然构建,spring却认不出来,此时需要反射的方式将这一自定义的注解加入到spring中。

       另外还有一些方法可以加载自定义的注解类:

Hadoop学习(五)——轻量级RPC框架        

Test方法是可以直接运行的,将spring与Junit结合,类Myserver3中有个autowired注解,此时程序会将helloservice下的所有类注入进来。

4.1.1.   通过ApplicationContextAware加载Spring上下文环境

在One中实现ApplicationContextAware接口会出现如何的变换呢?

Hadoop学习(五)——轻量级RPC框架

结果:

Hadoop学习(五)——轻量级RPC框架

4.1.2.   InitializingBean的作用

在One中实现InitializingBean接口呢?

Hadoop学习(五)——轻量级RPC框架

结果:

Hadoop学习(五)——轻量级RPC框架

4.1.3.   如果使用注解@Component

使用@Component注入类,那么它的顺序是如何呢?

4.1.4.   结论

1、  spring先检查注解注入的bean,并将它们实例化

2、  然后spring初始化bean的顺序是按照xml中配置的顺序依次执行构造

3、  如果某个类实现了ApplicationContextAware接口,会在类初始化完成后调用setApplicationContext()方法进行操作

4、  如果某个类实现了InitializingBean接口,会在类初始化完成后,并在setApplicationContext()方法执行完毕后,调用afterPropertiesSet()方法进行操作

4.2.  注解使用回顾

         1、在spring中,用注解来向Spring容器注册Bean。需要在applicationContext.xml中注册<context:component-scanbase-package=”pagkage1[,pagkage2,…,pagkageN]”/>。

         2、如果某个类的头上带有特定的注解@Component/@Repository/@Service/@Controller,就会将这个对象作为Bean注册进Spring容器

         3、在使用spring管理的bean时,无需在对调用的对象进行new的过程,只需使用@Autowired将需要的bean注入本类即可

4.3.  自定义注解

4.3.1.   解释

1、自定义注解的作用:在反射中获取注解,以取得注解修饰的“类、方法、属性”的相关解释。

2、java内置注解

     @Target 表示该注解用于什么地方,可能的 ElemenetType 参数包括: 

            ElemenetType.CONSTRUCTOR   构造器声明 

            ElemenetType.FIELD   域声明(包括 enum 实例) 

            ElemenetType.LOCAL_VARIABLE   局部变量声明 

            ElemenetType.METHOD   方法声明 

            ElemenetType.PACKAGE   包声明 

            ElemenetType.PARAMETER   参数声明 

            ElemenetType.TYPE   类,接口(包括注解类型)或enum声明

      @Retention 表示在什么级别保存该注解信息。可选的 RetentionPolicy 参数包括: 

           RetentionPolicy.SOURCE   注解将被编译器丢弃 

           RetentionPolicy.CLASS   注解在class文件中可用,但会被VM丢弃 

           RetentionPolicy.RUNTIME   JVM将在运行期也保留注释,因此可以通过反射机制读取注解的信息。

4.3.2.   实现

定义自定义注解

@Target({ ElementType.TYPE })//注解用在接口上

@Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息

@Component

public @interface RpcService {

         String value();

}

2、将直接类加到需要使用的类上,我们可以通过获取注解,来得到这个类

@RpcService("HelloService")

public class HelloServiceImpl implements HelloService {

    public String hello(String name) {

        return "Hello! " + name;

    }

}

3、类实现的接口

public interface HelloService {

    String hello(String name);

}

4、通过ApplicationContext获取所有标记这个注解的类

@Component

public class MyServer implements ApplicationContextAware {

         @SuppressWarnings("resource")

         public static void main(String[] args) {

       new ClassPathXmlApplicationContext("spring2.xml");

    }

         public void setApplicationContext(ApplicationContext ctx)

                            throws BeansException {

                   Map<String, Object> serviceBeanMap = ctx

                                     .getBeansWithAnnotation(RpcService.class);

                   for (Object serviceBean : serviceBeanMap.values()) {

                            try {

                                     Method method = serviceBean.getClass().getMethod("hello", new Class[]{String.class});

                                     Object invoke = method.invoke(serviceBean, "bbb");

                                     System.out.println(invoke);

                            } catch (Exception e) {

                                     e.printStackTrace();

                            }

                   }

         }

}

5、  结合spring实现junit测试

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = "classpath:spring2.xml")

public class MyServer implements ApplicationContextAware {

         @Test

         public void helloTest1() {

         }

         public void setApplicationContext(ApplicationContext ctx)

                            throws BeansException {

                   Map<String, Object> serviceBeanMap = ctx

                                     .getBeansWithAnnotation(RpcService.class);

                   for (Object serviceBean : serviceBeanMap.values()) {

                            try {

                                     Method method = serviceBean.getClass().getMethod("hello",

                                                        new Class[] { String.class });

                                     Object invoke = method.invoke(serviceBean, "bbb");

                                     System.out.println(invoke);

                            } catch (Exception e) {

                                     e.printStackTrace();

                            }

                   }

         }

}

5.  轻量级RPC框架开发

5.1.  轻量级RPC框架需求分析及原理分析

5.1.1.   netty实现的RPC的缺点

rpc自身的框架可以直接构建:

Hadoop学习(五)——轻量级RPC框架

在实际应用中,一般构建业务需要的类,实现业务需要的接口,此时,该如何使用rpc服务的类,如下:

Hadoop学习(五)——轻量级RPC框架

程序构建的是一个HelloServiceImpl类,实现了HelloService接口,如果要调用rpc的服务,此时只需要在注解中添加@RPCService(HelloService.class)注解。

         用上面的方式,程序员只需要自己创建一个类,然后加上公司提供的注解:rpcservice,则该类即可对外提供服务。

         具体该如何实现?

         在完成上面要求之前,需要在配置文件(spring.xml)中进行相应的配置:

         serviceRegistry与rpcserver两个类是jar包中写好的类,因此无法进行修改,此时将这两个类导入到文件中,然后引入自动扫包标签。

         Rpcserver类中注入了ApplicationContextAware,并通过反射的方式(如上面介绍)自定义了rpcserver标签,然后在spring.xml文件中定义扫包标签,此时,启动程序则会自动的启动rpcserver服务。

Hadoop学习(五)——轻量级RPC框架

可视化其流程为:

1)  用户构建工程,其中实现helloserverimpl类,并加上rpcserver注解,同时确定一个spring.xml文件,引入rpcserver的bean;

Hadoop学习(五)——轻量级RPC框架

2)  然后构建bootstrap,在其中的main方法中引入spring.xml文件,运行时便会启动

Springcontext,之后spring就开始工作了,此时spring就会扫描工程中所有的注解,一扫描就会扫到bean中的各种类。

Hadoop学习(五)——轻量级RPC框架

3)  spring将bean中的内容进行构造,构造出框架中的rpcserver,rpcserver中要implements contextAware,其中有一个方法是setApplicationContext(),可以在其中启动netty服务。

4)  在setApplicationContext()中定义一个beanmap,然后netty.start(),接下来只需要@rpcserver标签就可以了。

下面可以具体看看RPCServer

public class RpcServer implements ApplicationContextAware, InitializingBean {

    private static final Logger LOGGER= LoggerFactory.getLogger(RpcServer.class);

    private String serverAddress;

    privateServiceRegistry serviceRegistry;

    private Map<String,Object> handlerMap = new HashMap<String, Object>();

    publicRpcServer(String serverAddress) {

        this.serverAddress = serverAddress;

    }

publicRpcServer(String serverAddress, ServiceRegistry serviceRegistry){

        this.serverAddress = serverAddress;

        this.serviceRegistry = serviceRegistry;

    }

    /**

     * 通过注解,获取RpcService.class,将它放到map

     */

    public void setApplicationContext(ApplicationContext ctx)

            throws BeansException{

        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);

        if (MapUtils.isNotEmpty(serviceBeanMap)) {

            for (Object serviceBean : serviceBeanMap.values()) {

                String interfaceName= serviceBean.getClass().getAnnotation(RpcService.class).value().getName();

                handlerMap.put(interfaceName, serviceBean);

            }

        }

    }

    /**

     * 服务端启动类

     */

    public void afterPropertiesSet() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap.group(bossGroup, workerGroup)

                           .channel(NioServerSocketChannel.class)

                          .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override

                        public void initChannel(SocketChannel channel)

                                   throws Exception {

                                   channel.pipeline()

                                   .addLast(new RpcDecoder(RpcRequest.class))

                                // 注册解码              in-1

                                .addLast(new RpcEncoder(RpcResponse.class))

                                // 注册编码            out

                                .addLast(new RpcHandler(handlerMap));

                                //注册RpcHandler    in-2

                        }

                    }).option(ChannelOption.SO_BACKLOG, 128)

                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            String[] array = serverAddress.split(":");

            String host = array[0];

            int port = Integer.parseInt(array[1]);

            ChannelFuture future = bootstrap.bind(host, port).sync();

            LOGGER.debug("server started on port {}", port);

            if (serviceRegistry != null) {

                serviceRegistry.register(serverAddress);

            }

            future.channel().closeFuture().sync();

        } finally {

            workerGroup.shutdownGracefully();

            bossGroup.shutdownGracefully();

        }

    }

}

在此基础上接下来netty业务该如何操作:

1)  绑定消息处理流水线;

2)  in解码用户的请求;

3)  in业务调用的handler;

4)  out将结果编码的handler

同时调用registerServer注册:

Hadoop学习(五)——轻量级RPC框架

用户请求进来后会通过request进入到netty的第一个handler中,第一个handler往往是一个decode方法,即RPCDecoder,在此类中进行decode方法调用,对文件进行解码;

Hadoop学习(五)——轻量级RPC框架

在deserialize中进行反序列化:

Hadoop学习(五)——轻量级RPC框架

此时decode中才会将反序列化的数据add到out中。

一旦有数据往下传输,数据会进入到netty中的第二个handler中,

即:new RpcHandler(handlerMap);在RpcHandler中有一个channelread0的方法,一旦有数据进来,数据会进入到channelRead0()中,数据会以request的方式传输,进入到handle方法中,channelRead0结束之后就会进入到out中,

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

out中有一个encode方法用来编码。

Hadoop学习(五)——轻量级RPC框架

截止到目前,服务端就将请求做完了,这条线是用户有请求时便会执行的;

另外还有一条线是程序一启动就会执行的,即实现向zookeeper注册,对于用户来讲并不知道服务器在哪里,因此需要用zookeeper来进行导引,在RPCserver中,有serviceRegister类:

 Hadoop学习(五)——轻量级RPC框架

然后将这个serviceRegistry注入到rpcserver中去,因此rpcserver调用运行时

serviceRegistry不为空:

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架 

上面serverAddress地址为服务器的地址,即nettyserver的地址。

在注册类中,主要有三个文件:

Hadoop学习(五)——轻量级RPC框架

serviceDiscovery.java主要针对客户端;serviceRegistry.java主要针对服务端;分别安装在两个jar包里。

类文件为:

/**

 * 服务注册ZK 在该架构中扮演了服务注册表的角色,用于注册所有服务器的地址与端口,并对客户端提供服务发现的功能

 */

publicclass ServiceRegistry {

     privatestaticfinal Logger LOGGER= LoggerFactory.getLogger(ServiceRegistry.class);

    private CountDownLatchlatch = new CountDownLatch(1);

    private String registryAddress;

    publicServiceRegistry(String registryAddress) {

        this.registryAddress = registryAddress;

    }

    /**

     * 创建zookeeper链接

     * @param data

     */

    public void register(String data) {

        if (data != null) {

            ZooKeeper zk =connectServer();

            if (zk!= null) {

                createNode(zk,data);

            }

        }

    }

    /**

     * 创建zookeeper链接,监听

     * @return

     */

    private ZooKeeperconnectServer() {

        ZooKeeper zk = null;

        try {

            zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,

                    new Watcher() {

                    public void process(WatchedEvent event) {

                            if (event.getState() == Event.KeeperState.SyncConnected) {

                                latch.countDown();

                            }

                        }

                    });

            latch.await();

        } catch (Exception e){

            LOGGER.error("", e);

        }

        return zk;

    }

    /**

     * 创建节点

     * @param zk

     * @param data

     */

    private void createNode(ZooKeeper zk, String data) {

        try {

            byte[] bytes = data.getBytes();

            if (zk.exists(Constant.ZK_REGISTRY_PATH, null) == null) {

                zk.create(Constant.ZK_REGISTRY_PATH, null, Ids.OPEN_ACL_UNSAFE,

                        CreateMode.PERSISTENT);

            }

            String path = zk.create(Constant.ZK_DATA_PATH, bytes,

                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            LOGGER.debug("create zookeeper node ({} =>{})", path, data);

        } catch (Exception e){

            LOGGER.error("", e);

        }

    }

}

则这个如何向zk注册,serviceRegistry在注册的时候传进来的即为zk的地址,

Hadoop学习(五)——轻量级RPC框架

在rpcserver类中调用的是这个类下的register方法,registryAddress的地址为zk的地址。

Hadoop学习(五)——轻量级RPC框架

下图中的地址为serverAddress的地址,下一句的目的是为了将serverAddress注册到zk中,

Hadoop学习(五)——轻量级RPC框架

在register方法中先new一个zk,即connectServer方法:

Hadoop学习(五)——轻量级RPC框架

返回一个zk,并用这个zk创建并返回一个节点:

Hadoop学习(五)——轻量级RPC框架 

创建完成节点后,就像zk中注册本服务器的地址信息。

Hadoop学习(五)——轻量级RPC框架

以上为服务端的设计,客户端该如何设计呢?

客户端的类为:

/**

 * RPC 客户端(用于发送 RPC 请求)

 */

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {

    private static final Logger LOGGER= LoggerFactory.getLogger(RpcClient.class);

    private String host;

    private int port;

    private RpcResponse response;

    private final Object obj = new Object();

    public RpcClient(String host, intport) {

        this.host = host;

        this.port = port;

    }

    /**

     * 链接服务端,发送消息

     * @param request

     * @return

     * @throws Exception

     */

    public RpcResponsesend(RpcRequest request) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();

            bootstrap.group(group).channel(NioSocketChannel.class)

                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override

                        publicvoid initChannel(SocketChannel channel)

                                throws Exception {

                            // pipeline中添加编码、解码、业务处理的handler

                            channel.pipeline()

                                .addLast(new RpcEncoder(RpcRequest.class))

                                .addLast(new RpcDecoder(RpcResponse.class))

                                .addLast(RpcClient.this);

                        }

                    }).option(ChannelOption.SO_KEEPALIVE, true);

            // 链接服务器

            ChannelFuture future = bootstrap.connect(host, port).sync();

            future.channel().writeAndFlush(request).sync();

            // 用线程等待的方式决定是否关闭连接

            synchronized(obj) {

                obj.wait();

            }

            if (response != null) {

                future.channel().closeFuture().sync();

            }

            returnresponse;

        } finally {

            group.shutdownGracefully();

        }

    }

    /**

     * 读取服务端的返回结果

     */

    @Override

    public void channelRead0(ChannelHandlerContext ctx, RpcResponse response)

               throws Exception {

        this.response = response;

        synchronized(obj) {

            obj.notifyAll();

        }

    }

    /**

     * 异常处理

     */

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

               throws Exception {

        LOGGER.error("client caught exception", cause);

        ctx.close();

    }

}

代理类的处理方式:

/**

 * RPC 代理(用于创建 RPC 服务代理)

 */

public class RpcProxy {

    private String serverAddress;

    private ServiceDiscovery serviceDiscovery;

    public RpcProxy(String serverAddress) {

        this.serverAddress = serverAddress;

    }

    public RpcProxy(ServiceDiscovery serviceDiscovery) {

        this.serviceDiscovery = serviceDiscovery;

    }

    /**

     * 创建代理

     * @param interfaceClass

     * @return

     */

    @SuppressWarnings("unchecked")

    public <T> Tcreate(Class<?> interfaceClass) {

        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),

                newClass<?>[] { interfaceClass }, newInvocationHandler() {

                    public Objectinvoke(Object proxy, Method method,

                            Object[] args) throws Throwable {

                        //创建RpcRequest,封装被代理类的属性

                        RpcRequest request = new RpcRequest();

                        request.setRequestId(UUID.randomUUID().toString());

                        request.setClassName(method.getDeclaringClass()

                                .getName());

                        request.setMethodName(method.getName());

                         request.setParameterTypes(method.getParameterTypes());

                        request.setParameters(args);

                        //查找服务

                        if (serviceDiscovery != null) {

                            serverAddress = serviceDiscovery.discover();

                        }

                        //随机获取服务的地址

                        String[] array = serverAddress.split(":");

                        String host = array[0];

                        intport = Integer.parseInt(array[1]);

                        //创建RpcClient,链接服务端

                        RpcClient client = new RpcClient(host, port);

                        //通过netty实现RPC

                        RpcResponse response = client.send(request);

                        //返回信息

                        if (response.isError()) {

                            throwresponse.getError();

                        } else {

                            returnresponse.getResult();

                        }

                    }

                });

         }

}

客户端的设计:

 Hadoop学习(五)——轻量级RPC框架

用proxy创建一个HelloServer的业务代理,然后用动态代理创建HelloServer业务类的对应方法。

在代理类proxy中,可以配置相应的request中,

Hadoop学习(五)——轻量级RPC框架

以此来确定请求专递给谁,并且传递什么内容,紧接着发现服务器在哪里;

Hadoop学习(五)——轻量级RPC框架

在discover中如何发现服务器:

Hadoop学习(五)——轻量级RPC框架

如果size大于零,说明有服务器,就可以发送相应的请求。

此时可以安装负载均衡的算法。

Hadoop学习(五)——轻量级RPC框架

在发送之前会有一个编码的过程,这一过程是在send中实现的,然后会发送给到服务端。

Spring中应配置为:

Hadoop学习(五)——轻量级RPC框架

Hadoop学习(五)——轻量级RPC框架

客户端的示意图为:

1)  创建spring.xml文件并注入tomcat中的listener:springlistener,spring一旦启动,回去加载配置文件spring.xml,构造出rpcproxy,并交给rpcproxy类,;

2)  用户会用rpcproxy.create(业务接口类.class),创建业务接口service,然后用这个service调用hello方法。

Hadoop学习(五)——轻量级RPC框架

3)  Create创建一个动态代理对象,这个service调用的便是动态代理对象中的invoke方法,在这invoke方法中发现服务器的地址,封装request对象,启动nettyclient客户端,调用send方法,调用了netty中的out流水线,就会到达服务端。

4)  服务端处理完成后,就会返回response中,进行反序列化,从中提取result并返回。

Hadoop学习(五)——轻量级RPC框架

在我们平常使用的RPC中,例如webservice,使用的习惯类似于下图:

Hadoop学习(五)——轻量级RPC框架

         但是netty的实现过于底层,我们不能够像以前一样只关心方法的调用,而是要关心数据的传输,对于不熟悉netty的开发者,需要了解很多netty的概念和逻辑,才能实现RPC的调用。

         应上面的需求,我们需要基于netty实现一个我们熟悉的RPC框架。逻辑如下:

Hadoop学习(五)——轻量级RPC框架

5.2.  zookeeper API简单使用及框架介绍

5.2.1.   zk在框架中的实现

         在上面的框架中,server端存在着一个问题,就是单点问题,也就是说,当服务端“挂了”之后,框架的使用就造成了单点屏障。

         我们可以通过zookeeper来实现服务端的负载均衡

Hadoop学习(五)——轻量级RPC框架