Spark内置框架rpc通讯机制及RpcEnv基础设施-Spark商业环境实战

Spark商业环境实战及调优进阶系列

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者所有,禁止转载,欢迎学习。

1. Spark 内置框架rpc通讯机制

TransportContext 内部握有创建TransPortClient和TransPortServer的方法实现,但却属于最底层的RPC通讯设施。为什么呢?

因为成员变量RPCHandler是抽象的,所以TransportContext只能为最底层的通讯基础。上层为NettyRPCEnv高层封装,并持有TransportContext引用,在TransportContext中传入NettyRpcHandler实体,来实现netty通讯回调Handler处理。请参照第二部分详解。

 /* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
 * channel. As each TransportChannelHandler contains a TransportClient, this enables server
 * processes to send messages back to the client on an existing channel.
 */
 
 
  public class TransportContext {
  private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
  private final TransportConf conf;
  private final RpcHandler rpcHandler;
  private final boolean closeIdleConnections;

  private final MessageEncoder encoder;
  private final MessageDecoder decoder;

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }

1.1 客户端和服务端统一的消息接收处理器 TransportChannelHandlerer

transportClient 和TransportServer 在配置Netty的pipeLine的handler处理器时,均采用TransportChannelHandler,

不过transportClient对应的是TransportResponseHander,TransportServer对应的的是TransportRequestHander。
如下图所示,在进行消息处理时,首先会经过TransportChannelHandler根据消息类型进行处理器选择。

Spark内置框架rpc通讯机制及RpcEnv基础设施-Spark商业环境实战

1.2 transportClient对应的是ResponseMessage

客户端一旦发送消息(均为Request消息),就会在

private final Map<Long, RpcResponseCallback> outstandingRpcs;

private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches

中缓存,用于回调处理。

Spark内置框架rpc通讯机制及RpcEnv基础设施-Spark商业环境实战

1.3 transportServer对应的是RequestMessage

服务端接收消息类型(均为Request消息)

  • ChunkFetchRequest
  • RpcRequest
  • OneWayMessage
  • StremRequest

服务端响应类型(均为Response消息):

  • ChunkFetchSucess
  • ChunkFetchFailure
  • RpcResponse
  • RpcFailure

2. Spark RpcEnv基础设施

2.1 上层建筑NettyRPCEnv

上层建筑NettyRPCEnv,持有TransportContext引用,在TransportContext中传入NettyRpcHandler实体,来实现netty通讯回调Handler处理

  • Dispatcher
  • TransportContext
  • TransPortClientFactroy
  • TransportServer
  • TransportConf

2.2 RpcEndPoint 与 RPCEndPointRef 端点

  • RpcEndPoint 为服务端
  • RPCEndPointRef 为客户端

2.2 Dispacher 与 Inbox 与 Outbox

  • 一个端点对应一个Dispacher,一个Inbox , 多个OutBox
  1. RpcEndpoint:RPC端点 ,Spark针对于每个节点(Client/Master/Worker)都称之一个Rpc端点 ,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher
  2. RpcEnv:RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv
  3. Dispatcher:消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己存入收件箱,如果指令接收方为非自身端点,则放入发件箱
  4. Inbox:指令消息收件箱,一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费
  5. OutBox:指令消息发件箱,一个远程端点对应一个发件箱,当消息放入Outbox后,紧接着将消息通过TransportClient发送出去。消息放入发件箱以及发送过程是在同一个线程中进行,这样做的主要原因是远程消息分为RpcOutboxMessage, OneWayOutboxMessage两种消息,而针对于需要应答的消息直接发送且需要得到结果进行处理
  6. TransportClient:Netty通信客户端,根据OutBox消息的receiver信息,请求对应远程TransportServer
  7. TransportServer:Netty通信服务端,一个RPC端点一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱

Spark内置框架rpc通讯机制及RpcEnv基础设施-Spark商业环境实战

Spark在Endpoint的设计上核心设计即为Inbox与Outbox,其中Inbox核心要点为:

  1. 内部的处理流程拆分为多个消息指令(InboxMessage)存放入Inbox
  2. 当Dispatcher启动最后,会启动一个名为【dispatcher-event-loop】的线程扫描Inbox待处理InboxMessage,并调用Endpoint根据InboxMessage类型做相应处理
  3. 当Dispatcher启动最后,默认会向Inbox存入OnStart类型的InboxMessage,Endpoint在根据OnStart指令做相关的额外启动工作,三端启动后所有的工作都是对OnStart指令处理衍生出来的,因此可以说OnStart指令是相互通信的源头
  • 注意: 一个端点对应一个Dispacher,一个Inbox , 多个OutBox,可以看到 inbox在Dispacher 中且在EndPointData内部:

     private final RpcHandler rpcHandler;
    /**
    * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
    */
     private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     private class EndpointData(
        val name: String,
        val endpoint: RpcEndpoint,
        val ref: NettyRpcEndpointRef) {
      val inbox = new Inbox(ref, endpoint)
    }
    private val endpoints = new ConcurrentHashMap[String, EndpointData]
    private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
    
    // Track the receivers whose inboxes may contain messages.
    private val receivers = new LinkedBlockingQueue[EndpointData]
    

Spark内置框架rpc通讯机制及RpcEnv基础设施-Spark商业环境实战

  • 注意: 一个端点对应一个Dispacher,一个Inbox , 多个OutBox,可以看到 OutBox在NettyRpcEnv内部:

    private[netty] class NettyRpcEnv(
      val conf: SparkConf,
      javaSerializerInstance: JavaSerializerInstance,
      host: String,
      securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
      
      private val dispatcher: Dispatcher = new Dispatcher(this)
      
      private val streamManager = new NettyStreamManager(this)
      private val transportContext = new TransportContext(transportConf,
      new NettyRpcHandler(dispatcher, this, streamManager))
      
    /**
     * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
     * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
     */
    private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
    

3 结语

秦凯新 于深圳 2018-10-28