基于netty+Spring+Zookeeper的分布式RPC框架

开发笔记

参考文章

启动示意图

基于netty+Spring+Zookeeper的分布式RPC框架

对示意图中出现的单词意义解释

provider

服务提供者

consumer

服务消费者

Zookeeper

Zookeeper集群,作为服务的注册中心使用

zk节点结构

模仿dubbo

  • 根路径:/nicerpc
  • 服务路径在根路径下,根据不同的服务名命名:/nicerpc/serviceName,比如/nicerpc/com.w.service.api.UserService,一个服务下有providers,consumers,routers,configurations节点
  • 服务提供者都在服务路径下的providers里:/nicerpc/serviceName/providers/providerHost+"#" +providerPort,比如/nicerpc/com.w.service.api.UserService/192.168.12.12#6666;后期框架优化,需要往节点的数据域附带这些host,port等信息。
  • consumer类似provider
  • 某一个服务的配置信息在/nicerpc/serviceName/configurations下,暂时并未用到
  • 路由信息在/nicerpc/serviceName/routers下,暂时并未用到

step1~6是启动顺序

  • step1,provider的服务暴露

    1. provider启动netty的ServerBootstrap,进入nio的select监听状态
    2. provider扫描指定包,基于Spring的BeanPostProcessor找到标注了@Remote注解的Bean
    3. 根据这些Bean实现的接口的名字——ServiceName——去zk路径/nicerpc/serviceName/providers**册自己为临时带***的节点,如果没有这些节点需要进行初始化建立这些节点;
    4. 同时将这些服务名与其具体实现的映射记录在concurrentHashMap里
  • step2,provider在zk注册完毕后,二者之间就有一条长连接,用于zk监视该机器状态

  • step3,consumer的服务发现

    1. consumer启动netty的Bootstrap,进入nio的select监听状态

    2. consumer扫描指定包,基于Spring的BeanPostProcessor找到标注了@RemoteInvoke的域,给这些域通过Field.set()方法设置一个基于SpringCGLib的代理类,在该代理类的intercept方法里,我们将会去使用netty进行一次rpc调用

    3. consumer通过ServerManager管理与Provider的连接,而在ServerManager内部,通过维护三个ConcurrentHashMap来管理连接

      • realServerPathMap:保存serviceName与当前主机列表的映射
        ​ key:com.nicerpc.nicerpc_demo.api.UserService

        ​ value:set{“192.168.1.2#8081”,“192.168.11.68#8081”}

      • hostAndPortManagerMap:保存serviceName与当前使用的provider所在的主机+端口的映射

        ​ key:com.nicerpc.nicerpc_demo.api.UserService

        ​ value:host#port

      • connectionManagerMap:保存与某台主机上的一个进程(这个进程可能有多个服务)的一条链接(连接缓存)

        ​ key:host#port
        value:ChannelFuture实例

    4. 如果是第一次连接,会去将自己注册到zk的consumers下,并进行服务发现,即向zk拉取相应的serviceName/providers下的所有子节点,根据节点信息更新三个Map,并根据realServerPathMap里的内容进行负载均衡选举,使用选举结果进行异步连接,并更新三个Map。

  • step4,consumer在zk注册完毕后,二者之间就有一条长连接,用于zk监视该机器状态

  • step5,consumer在拿到了目标provider的地址后,发起rpc调用。

    1. 通过初始化时,注册的动态代理类的intercept方法实现,在该方法内,封装一个在consumer-provider里传递的媒介——ClientRequest,他封装了一个请求。
    2. 将clientRequest通过JSON格式化后,编码后发送给provider
    3. 通过Condition等并发工具类,异步获取服务端返回结果,并进行解码,返回给业务代码
  • step6,provider对consumer的rpc调用的响应

    1. 接收到新消息后,经过解码,将Json字符串重新格式化为ClientRequest对象
    2. 拿到请求的serviceName,methodName等,从缓存beanMethodMap中取出bean和method,执行method的invoke方法,将返回值封装进Response内,经过JSON格式化,编码,再异步的发送回consumer

实现细节

对于“当server的状态没有还没有同步到consumer的服务器列表里”这种情况该如何获取连接

一般情况下,因为server的状态通过Zookeeper不能同步的更新到consumer本地的服务器列表(realServerSet)内,所以,当一台甚至几台server同时宕掉而consumer不知道这些server宕掉并且去尝试去调用的时候,可能会造成不应该的消息发送失败。

对于这种情况,我的思考过程如下:

思考第一阶段

当本地对于该主机没有可以用的future缓存的时候,该怎么办?先不考虑真实的server状态是否更新到providers本地缓存表里,也不考虑基于netty的connect操作默认是异步的

我觉得如果没有的话,不管是future == null(的确没连接过)的情况,还是future的channel是isNotActive(可能是provider关闭)的情况,都需要重新建立一个新的连接,原因如下所示:
经过思考,我觉得应该是重新使用负载均衡策略进行选举,因为负载均衡选举是根据最新的providers本地缓存表进行的(至少当时是直接使用providers本地缓存表进行选举的,后面改成了接受一个Set<String>形参,根据这个形参来进行选举),所以他可以保证选举出来的provider都是可用的,假如重连当前的provider的话,如果该provider所在的主机进程关闭,虽然providers本地缓存表会更新,但我们在这里重连还是会失败,所以应该以providers本地缓存表为准,应该直接重新选举,然后根据选举结果进行重新连接,如果连接成功的话就返回,如果连接失败的话就再次递归调用本方法。

思考第二阶段

现在考虑真实的server状态不能同步的更新到consumer的providers本地缓存表里,而且同一台机器被负载均衡选中多次,并且该机器已经下线,该怎么办?
因为providers本地缓存表并不是跟真实的服务器情况同步更新的,所以在真实的服务器情况同步的这段时间内,如果负载均衡每次负载的都是同一台机器(根据负载均衡策略不同是可能发生这种情况的),而这台机器如果是已经下线的(但它的下线状态还没有被更新到providers本地缓存表中),在这段时间内,程序可能会一直递归下去!

所以我们需要在负载均衡前,尝试重连接一次,如果重连接成功的话就返回,如果失败的话,从providers本地缓存表中remove掉,然后进行负载均衡,这样的话,如果所有机器都宕掉,我们也会一台一台的将所有机器删除掉,从而最后在负载均衡的时候发现providers本地缓存表中为空,会抛出异常,终止,就不会无限的递归下去了。

思考第三阶段

考虑基于netty的connect操作默认是异步的

因为netty的connect操作默认是异步的,所以没办法同步的获取到连接是否成功的结果。当然,future支持sync,await等操作(这里,使用netty时,需要注意IO超时,与await超时的区别),但是这些都需要阻塞,都会付出不小的代价,会影响框架的性能。但是,如果不使用这些函数,我们就没办法达到同步获取连接结果的需求,也就没办法通过连接结果来决定是否在providers本地缓存表remove掉当前集群选举出来的这个provider,就还是可能出现思考阶段二出现的问题。

所以,我想到了针对这种想到了名为fastGetConnection的解决方案,中和了以上性能与程序无限递归之间的矛盾。方案如下:
在发现当前主机没有可用的future缓存的时候,首先关闭该future,然后构造一个从providers本地缓存表的副本,从该副本中剔除当前provider,使用这个副本进行fastGetConnection(Set<String> serverSet)操作。

fastGetConnection(Set<String> serverSet):
每次都使用形参serverSet进行负载均衡选举,并使用选举结果进行连接,并使用返回的future.await(long,TimeUnit)设置等待超时时间,当await阻塞完毕后(不管future是否连接完毕,阻塞完毕可能是超时,可能是连接完毕),检查是否连接成功,如果连接成功则返回,否则在传进来的serverSet中remove掉刚刚那个server,并递归调用fastGetConnection,这里我们可以通过对超时时间进行调优,快速的获取一个客户端的连接了,当遇到网络波动的时候,我们选取到的是一个连通最快的provider,而且serverSet.remove(server)这个操作是在副本上进行的,不会影响真实的server列表的更新。

也就是在providers本地缓存表的副本上,进行选举,快速连接,根据连接结果在副本上remove或者返回连接,真实的providers情况不去管它,任其*地更新,哪怕我们remove掉一个只是网络比较慢的但还可以用的provider也不会影响到它在providers本地缓存表是否存在。

PS:增加一个补救措施

然后又经过思考,决定在加一个补救措施,就是normalGetConnection,如果通过fastGetConnection获取不到(这种情况只会发生在把providers本地缓存表副本掏空后,负载均衡选举发生异常的时候),会接着通过normalGetConnection获取。normalGetConnection就仅仅是根据当前的providers本地缓存表列表做一个负载均衡选举,然后根据选举结果直接进行异步连接,不管连接结果是不是成功

存在的问题

await超时时间的设置多少合适,而且也不清楚使用await带来的性能损耗。

以下是知识笔记

Netty

关于addListener方法

  • 经过测试,channelFuture.channel().xxx().addListener(listener)
    方法添加的监听器只对单种IO事件的单次操作有效。
  • 比如,channelFuture.channel().connect().addListener()添加的监听器只会在connect完成后调用,不会在…channel().writeAndFlush()完成后调用
  • 再比如,channelFuture.channel().writeAndFlush().addListener()添加的监听器只会在本次writeAndFlush()完成后执行,假如接着执行了另一次writeAndFlush()操作,该监听器不会再次触发
  • ps:监听器会在isDone()返回true后被立即调用

关于sync方法

  • sync会阻塞到当前IO操作直到其isDone()返回true。
  • isDone()代表着完成了,结果可能是成功,失败,被中断等。
  • 该IO操作是否成功还是得看isSuccess()方法。

关于await()方法

  • await(),await(long,TimeUnit),awaitUnInterrupt等方法,底层调用了Object的wait方法,他会阻塞到isDone返回true,上面说到了,isDone返回true有多种结果,不一定成功。
  • 当设置了超时时间时,只要到达了超时时间,就不管isDone是否返回true了,会直接返回。

关于bootstrap的connect方法

  • 经测试,使用同一个bootstrap去连接同一个主机上的同一个进程(ip、port)都相同,返回的是不同的两个future(也就是两个连接),二者可以并发的读写数据。

dubbo

  • 自动发现: 基于注册中心目录服务,使服务消费方能动态的查找服务提供方,使地址透明,使服务提供方可以平滑增加或减少机器。
  • 集群容错: 提供基于接口方法的透明远程过程调用,包括多协议支持,以及软负载均衡,失败容错,地址路由,动态配置等集群支持。
  • 远程通讯: 提供对多种基于长连接的NIO框架抽象封装,包括多种线程模型,序列化,以及“请求-响应”模式的信息交换方式。

nicerpc实现的功能

  • 客户端超时,超时的链接自动关闭
  • 分离业务模块
  • 增加zk模块,从zk获取服务器列表
  • 客户端动态管理连接
  • Netty实现RPC服务器
  • 定义自己的简单通信协议
  • Client与RPC服务器使用长连接进行异步通信
  • 客户端动态代理使用SpringCGLib,BeanPostProcessor接口
  • 服务器注册到Zookeeper,客户端通过Zookeeper监听服务器状态