rpcx服务框架浅析13-远程调用
RPCX分布式服务框架主要致力于提供高性能和透明化的RPC远程服务调用。
RPCX框架服务消费方向服务提供方发起调用,可分为同步和异步方式。
异步调用
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServicePath = servicePath
call.ServiceMethod = serviceMethod
meta := ctx.Value(share.ReqMetaDataKey)
if meta != nil { //copy meta in context to meta in requests
call.Metadata = meta.(map[string]string)
}
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(ctx, call)
return call
}
同步调用
func (client *Client) call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error {
seq := new(uint64)
ctx = context.WithValue(ctx, seqKey{}, seq)
Done := client.Go(ctx, servicePath, serviceMethod, args, reply, make(chan *Call, 1)).Done
var err error
select {
case <-ctx.Done(): //cancel by context
client.mutex.Lock()
call := client.pending[*seq]
delete(client.pending, *seq)
client.mutex.Unlock()
if call != nil {
call.Error = ctx.Err()
call.done()
}
return ctx.Err()
case call := <-Done:
err = call.Error
meta := ctx.Value(share.ResMetaDataKey)
if meta != nil && len(call.ResMetadata) > 0 {
resMeta := meta.(map[string]string)
for k, v := range call.ResMetadata {
resMeta[k] = v
}
}
}
return err
}
服务消费方发起调用流程
- 服务消费方发起调用时生成Seq会话序号,服务提供方回复时会带上这个Seq。
- 服务消费方自身维护一个待回复队列,服务消费方每次发起调用后都会生成一个等待回复消息体并放入待回复队列,其实应该是一个map,map[Seq]*Call,Call就是封装的等待回复消息体。
- 服务消费方会启动一个接收协程,循环接收服务提供方的回复。
- 服务消费方发起调用后,同步情况下会等待服务提供方回复。
- 服务提供方回复消息后,服务消费方接收协程收到命令,解析出对应的Seq,然后在等待回复队列中用Seq找到等待回复消息体并唤醒服务消费方。